When checkpointing is turned on a simple CEP loop pattern
I see failures. SimpleBinaryEvent is
} failure cause:
I am sure I have the equals() and hashCode() implemented the way it should be. I have tried the Objects.hashCode too. In other instances I have had CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), which again points to issues with references ( equality and what not ). Without checkpointing turned on it works as expected. I am running on a local cluster. Is CEP production ready ? I am using 1.3.2 Flink |
Hi Vishal,
I think it might be due to this bug: https://issues.apache.org/jira/browse/FLINK-8226 It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? Would be really helpful. If the error still persists could you file a jira? Regards Dawid > On 11 Jan 2018, at 19:49, Vishal Santoshi <[hidden email]> wrote: > > When checkpointing is turned on a simple CEP loop pattern > > private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn) > .followedBy("middle").where(checkStatusOn).times(2) > .next("end").where(checkStatusOn).within(Time.minutes(5)) > > I see failures. > > SimpleBinaryEvent is > > public class SimpleBinaryEvent implements Serializable { > > private int id; > private int sequence; > private boolean status; > private long time; > > public SimpleBinaryEvent(int id, int sequence, boolean status , long time) { > > this.id > = id; > this.sequence = sequence; > this.status = status; > this.time = time; > } > public int getId() { > return id; > } > public int getSequence() { > return sequence; > } > public boolean isStatus() { > return status; > } > public long getTime() { > return time; > } > @Override > public boolean equals(Object o) { > if (this == o) return true; > if (o == null || getClass() != o.getClass()) return false; > > SimpleBinaryEvent that = (SimpleBinaryEvent) o; > > if (getId() != that.getId()) return false; > if (isStatus() != that.isStatus()) return false; > if (getSequence() != that.getSequence()) return false; > return getTime() == that.getTime(); > } > > @Override > public int hashCode() { > //return Objects.hash(getId(),isStatus(), getSequence(),getTime()); > int result = getId(); > result = 31 * result + (isStatus() ? 1 : 0); > result = 31 * result + getSequence(); > result = 31 * result + (int) (getTime() ^ (getTime() >>> 32)); > return result; > } > > @Override > public String toString() { > return "SimpleBinaryEvent{" + > "id='" + id + '\'' + > ", status=" + status + > ", sequence=" + sequence + > ", time=" + time + > '}'; > } > > } > > failure cause: > > Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1). > ... 6 more > Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),.... > > I am sure I have the equals() and hashCode() implemented the way it should be. I have tried the Objects.hashCode too. In other instances I have had CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), which again points to issues with references ( equality and what not ). Without checkpointing turned on it works as expected. I am running on a local cluster. Is CEP production ready ? > > I am using 1.3.2 Flink > signature.asc (849 bytes) Download Attachment |
Thanks. We will. When is 1.4.1 scheduled for release ? On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz <[hidden email]> wrote: Hi Vishal, |
We don't have a schedule for bugfix releases but do them based on need. AFAIK, a discussion about a 1.4.1 release has not been started yet. Would you like to kick that off by sending a mail to the dev mailing list? 2018-01-12 16:41 GMT+01:00 Vishal Santoshi <[hidden email]>:
|
Will do. On Sun, Jan 14, 2018 at 10:23 AM, Fabian Hueske <[hidden email]> wrote:
|
Have tested against the 1.5 SNAPShot ( I simply pulled the source code into my distribution and compiled it into my job jar ). Both the test code and the cluster seems to work ok. Have not tested the "savepoint and resume" mode but restore from checkpoint works. I brought the JM down and restarted it. I have to sanitize the output but at least the exception is not thrown. One thing though and please confirm In CEP it seems that a POJO pushed into the window as part of Pattern match has to have an "exact" equals/hashcode. As in in my case I had a custom equals/hashcode for enabling "contains" for a different context as in I had deliberately not included an instance variable in the equals/hashcode contract. Is that a design decision or a requirement in CEP ? Thanks and Regards. On Sun, Jan 14, 2018 at 12:27 PM, Vishal Santoshi <[hidden email]> wrote:
|
Hi Vishal,
Thanks for checking and glad to hear that your job works after the fix! As for the equals/hashcode question, if your question is if you have to implement exact equals() method and the corresponding hashcode() then the answer is yes. These methods are used when retrieving and cleaning up “outdated” data from FlinkCEP’s internal datastructures. As a consequence, ambiguous implementations can lead to the wrong elements being cleaned up. Thanks, Kostas
|
Thanks. Confirmed through tests the above behavior. On Tue, Jan 23, 2018 at 4:09 AM, Kostas Kloudas <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |