I am trying to use the KeyBy operator as follows : When I run the program, I get the following exception:Pattern<MyEvent, ?> myEventsCEPPattern = Pattern.<MyEvent>begin("FirstEvent") .subtype(MyEvent.class) .next("SecondEvent") .subtype(MyEvent.class) .within(Time.hours(1)); PatternStream<MyEvent> myEventsPatternStream = CEP.pattern( meEvents.keyBy("field1", "field6"), myEventsCEPPattern ); The program finished with the following exception: This type (GenericType<com.events.MyEvent>) cannot be used as key. org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330) org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294) MyEvent is a POJO. What is that I am doing wrong? public abstract class AbstractEvent { private String field1; private String field2; private String field3; private String field4; private Timestamp eventTimestmp; public AbstractEvent(String field1, String field2, String field3, String field4, Timestamp eventTimestmp) { this.field1 = field1; this.field2 = field2; this.field3 = field3; this.field4 = field4; this.eventTimestmp = eventTimestmp; } public AbstractEvent() { } public String getField1() { return field1; } public AbstractEvent setField1(String field1) { this.field1 = field1; return this; } public String getField2() { return field2; } public AbstractEvent setField2(String field2) { this.field2 = field2; return this; } public String getField3() { return field3; } public AbstractEvent setField3(String field3) { this.field3 = field3; return this; } public String getField4() { return field4; } public AbstractEvent setField4(String field4) { this.field4 = field4; return this; } public Timestamp getEventTimestmp() { return eventTimestmp; } public AbstractEvent setEventTimestmp(Timestamp eventTimestmp) { this.eventTimestmp = eventTimestmp; return this; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (!(o instanceof AbstractEvent)) { return false; } AbstractEvent that = (AbstractEvent) o; if (!getField1().equals(that.getField1())) { return false; } if (!getField2().equals(that.getField2())) { return false; } if (!getField3().equals(that.getField3())) { return false; } if (!getField4().equals(that.getField4())) { return false; } return getEventTimestmp().equals(that.getEventTimestmp()); } @SuppressWarnings({"MagicNumber"}) @Override public int hashCode() { int result = getField1().hashCode(); result = 31 * result + getField2().hashCode(); result = 31 * result + getField3().hashCode(); result = 31 * result + getField4().hashCode(); result = 31 * result + getEventTimestmp().hashCode(); return result; } @Override public String toString() { return "AbstractEvent{" + "field1='" + field1 + '\'' + ", field2='" + field2 + '\'' + ", field3='" + field3 + '\'' + ", field4='" + field4 + '\'' + ", eventTimestmp=" + eventTimestmp + '}'; } } public class MyEvent extends AbstractEvent { private Timestamp responseTime; private String field5; private String field6; private int field7; private String field8; private String field9; public MyEvent(String field1, String field2, String field3, String field4, Timestamp eventTimestmp, Timestamp responseTime, String field5, String field6, int field7, String field8, String field9) { super(field1, field2, field3, field4, eventTimestmp); this.responseTime = responseTime; this.field5 = field5; this.field6 = field6; this.field7 = field7; this.field8 = field8; this.field9 = field9; } public MyEvent() { super(); } public int getField7() { return field7; } public String getField8() { return field8; } public String getField9() { return field9; } public String getField5() { return field5; } public String getField6() { return field6; } public Timestamp getResponseTime() { return responseTime; } public MyEvent setResponseTime(Timestamp responseTime) { this.responseTime = responseTime; return this; } public MyEvent setField7(int field7) { this.field7 = field7; return this; } public MyEvent setField8(String field8) { this.field8 = field8; return this; } public MyEvent setField9(String field9) { this.field9 = field9; return this; } public MyEvent setField5(String field5) { this.field5 = field5; return this; } public MyEvent setField6(String field6) { this.field6 = field6; return this; } @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof MyEvent)) return false; if (!super.equals(o)) return false; MyEvent that = (MyEvent) o; if (getField7() != that.getField7()) return false; if (!getResponseTime().equals(that.getResponseTime())) return false; if (!getField5().equals(that.getField5())) return false; if (!getField6().equals(that.getField6())) return false; if (!getField8().equals(that.getField8())) return false; return getField9().equals(that.getField9()); } @Override public int hashCode() { int result = super.hashCode(); result = 31 * result + getResponseTime().hashCode(); result = 31 * result + getField5().hashCode(); result = 31 * result + getField6().hashCode(); result = 31 * result + getField7(); result = 31 * result + getField8().hashCode(); result = 31 * result + getField9().hashCode(); return result; } } |
Hi Sridhar,
according to the exception, your "meEvents" stream is not POJO. You can check that by printing "meEvents.getType()". In general, you can always check the log for such problems. There should be something like: 14:40:57,079 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.streaming.examples.wordcount.WordCount$MyEvent does not contain a setter for field responseTime 14:40:57,083 INFO org.apache.flink.api.java.typeutils.TypeExtractor - class org.apache.flink.streaming.examples.wordcount.WordCount$MyEvent is not a valid POJO type because not all fields are valid POJO fields. The problem is that your setters have a return type. A POJO setter usually should have a void return type. But I agree that this should be mentioned in the documentation. Regards, Timo Am 07.09.17 um 05:20 schrieb Sridhar Chellappa: > I am trying to use the KeyBy operator as follows : > > > Pattern<MyEvent, ?> myEventsCEPPattern = > Pattern.<MyEvent>begin("FirstEvent") > .subtype(MyEvent.class) > .next("SecondEvent") > .subtype(MyEvent.class) > .within(Time.hours(1)); > > > > PatternStream<MyEvent> myEventsPatternStream = > CEP.pattern( > meEvents.keyBy("field1", "field6"), > myEventsCEPPattern > ); > > > > When I run the program, I get the following exception: > > The program finished with the following exception: > > This type (GenericType<com.events.MyEvent>) cannot be used as key. > org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:330) > org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:294) > > > MyEvent is a POJO. What is that I am doing wrong? > > > Here is the relevant code : > > public abstract class AbstractEvent { > private String field1; > private String field2; > private String field3; > private String field4; > private Timestamp eventTimestmp; > > public AbstractEvent(String field1, String field2, String field3, > String field4, Timestamp eventTimestmp) { > this.field1 = field1; > this.field2 = field2; > this.field3 = field3; > this.field4 = field4; > this.eventTimestmp = eventTimestmp; > } > > public AbstractEvent() { > } > > public String getField1() { > return field1; > } > > public AbstractEvent setField1(String field1) { > this.field1 = field1; > return this; > } > > public String getField2() { > return field2; > } > > public AbstractEvent setField2(String field2) { > this.field2 = field2; > return this; > } > > public String getField3() { > return field3; > } > > public AbstractEvent setField3(String field3) { > this.field3 = field3; > return this; > } > > public String getField4() { > return field4; > } > > public AbstractEvent setField4(String field4) { > this.field4 = field4; > return this; > } > > public Timestamp getEventTimestmp() { > return eventTimestmp; > } > > public AbstractEvent setEventTimestmp(Timestamp eventTimestmp) { > this.eventTimestmp = eventTimestmp; > return this; > } > > @Override > public boolean equals(Object o) { > if (this == o) { > return true; > } > if (!(o instanceof AbstractEvent)) { > return false; > } > > AbstractEvent that = (AbstractEvent) o; > > if (!getField1().equals(that.getField1())) { > return false; > } > if (!getField2().equals(that.getField2())) { > return false; > } > if (!getField3().equals(that.getField3())) { > return false; > } > if (!getField4().equals(that.getField4())) { > return false; > } > return getEventTimestmp().equals(that.getEventTimestmp()); > } > > @SuppressWarnings({"MagicNumber"}) > @Override > public int hashCode() { > int result = getField1().hashCode(); > result = 31 * result + getField2().hashCode(); > result = 31 * result + getField3().hashCode(); > result = 31 * result + getField4().hashCode(); > result = 31 * result + getEventTimestmp().hashCode(); > return result; > } > > @Override > public String toString() { > return "AbstractEvent{" > + "field1='" + field1 + '\'' > + ", field2='" + field2 + '\'' > + ", field3='" + field3 + '\'' > + ", field4='" + field4 + '\'' > + ", eventTimestmp=" + eventTimestmp > + '}'; > } > } > > > public class MyEvent extends AbstractEvent { > > private Timestamp responseTime; > private String field5; > private String field6; > private int field7; > private String field8; > private String field9; > > public MyEvent(String field1, String field2, String field3, String > field4, Timestamp eventTimestmp, > Timestamp responseTime, > String field5, String field6, int field7, String field8, > String field9) { > super(field1, field2, field3, field4, eventTimestmp); > this.responseTime = responseTime; > this.field5 = field5; > this.field6 = field6; > this.field7 = field7; > this.field8 = field8; > this.field9 = field9; > } > > public MyEvent() { > super(); > } > > public int getField7() { > return field7; > } > > public String getField8() { > return field8; > } > > public String getField9() { > return field9; > } > > public String getField5() { > return field5; > } > > public String getField6() { > return field6; > } > > public Timestamp getResponseTime() { > return responseTime; > } > > public MyEvent setResponseTime(Timestamp responseTime) { > this.responseTime = responseTime; > return this; > } > > public MyEvent setField7(int field7) { > this.field7 = field7; > return this; > } > > public MyEvent setField8(String field8) { > this.field8 = field8; > return this; > } > > public MyEvent setField9(String field9) { > this.field9 = field9; > return this; > } > > public MyEvent setField5(String field5) { > this.field5 = field5; > return this; > } > > public MyEvent setField6(String field6) { > this.field6 = field6; > return this; > } > > @Override > public boolean equals(Object o) { > if (this == o) return true; > if (!(o instanceof MyEvent)) return false; > if (!super.equals(o)) return false; > > MyEvent that = (MyEvent) o; > > if (getField7() != that.getField7()) return false; > if (!getResponseTime().equals(that.getResponseTime())) return > false; > if (!getField5().equals(that.getField5())) return false; > if (!getField6().equals(that.getField6())) return false; > if (!getField8().equals(that.getField8())) return false; > return getField9().equals(that.getField9()); > } > > @Override > public int hashCode() { > int result = super.hashCode(); > result = 31 * result + getResponseTime().hashCode(); > result = 31 * result + getField5().hashCode(); > result = 31 * result + getField6().hashCode(); > result = 31 * result + getField7(); > result = 31 * result + getField8().hashCode(); > result = 31 * result + getField9().hashCode(); > return result; > } > } > > > > > > |
That fixed my issue. Thanks. I also agree we need to fix the Documentation On Thu, Sep 7, 2017 at 6:15 PM, Timo Walther <[hidden email]> wrote: Hi Sridhar, |
Following up: here’s the JIRA ticket for improving the POJO data type documentation - https://issues.apache.org/jira/browse/FLINK-7614. - Gordon On 11 September 2017 at 10:31:23 AM, Sridhar Chellappa ([hidden email]) wrote:
|
Free forum by Nabble | Edit this page |