CEP issue in 1.3.2. Does 1.4 fix this ?

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

CEP issue in 1.3.2. Does 1.4 fix this ?

Vishal Santoshi

When checkpointing is turned on a simple CEP loop pattern

 private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
        .followedBy("middle").where(checkStatusOn).times(2)
        .next("end").where(checkStatusOn).within(Time.minutes(5))

I see failures.

SimpleBinaryEvent is

public class SimpleBinaryEvent implements Serializable {

private int id;
private int sequence;
private boolean status;
private long time;

public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
    this.id = id;
    this.sequence = sequence;
    this.status = status;
    this.time = time;
}
public int getId() {
    return id;
}
public int getSequence() {
    return sequence;
}
public boolean isStatus() {
    return status;
}
public long getTime() {
    return time;
}
@Override
public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    SimpleBinaryEvent that = (SimpleBinaryEvent) o;

    if (getId() != that.getId()) return false;
    if (isStatus() != that.isStatus()) return false;
    if (getSequence() != that.getSequence()) return false;
    return getTime() == that.getTime();
}

@Override
public int hashCode() {
    //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
    int result = getId();
    result = 31 * result + (isStatus() ? 1 : 0);
    result = 31 * result + getSequence();
    result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
    return result;
}

@Override
public String toString() {
    return "SimpleBinaryEvent{" +
            "id='" + id + '\'' +
            ", status=" + status +
            ", sequence=" + sequence +
            ", time=" + time +
            '}';
}

}

failure cause:

Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1).
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....

I am sure I have the equals() and hashCode() implemented the way it should be. I have tried the Objects.hashCode too. In other instances I have had CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), which again points to issues with references ( equality and what not ). Without checkpointing turned on it works as expected. I am running on a local cluster. Is CEP production ready ?

I am using 1.3.2 Flink

Reply | Threaded
Open this post in threaded view
|

Re: CEP issue in 1.3.2. Does 1.4 fix this ?

Dawid Wysakowicz
Hi Vishal,
I think it might be due to this bug: https://issues.apache.org/jira/browse/FLINK-8226
It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? Would be really helpful. If the error still persists could you file a jira?

Regards
Dawid

> On 11 Jan 2018, at 19:49, Vishal Santoshi <[hidden email]> wrote:
>
> When checkpointing is turned on a simple CEP loop pattern
>
>  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
>         .followedBy("middle").where(checkStatusOn).times(2)
>         .next("end").where(checkStatusOn).within(Time.minutes(5))
>
> I see failures.
>
> SimpleBinaryEvent is
>
> public class SimpleBinaryEvent implements Serializable {
>
> private int id;
> private int sequence;
> private boolean status;
> private long time;
>
> public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
>
> this.id
>  = id;
>     this.sequence = sequence;
>     this.status = status;
>     this.time = time;
> }
> public int getId() {
>     return id;
> }
> public int getSequence() {
>     return sequence;
> }
> public boolean isStatus() {
>     return status;
> }
> public long getTime() {
>     return time;
> }
> @Override
> public boolean equals(Object o) {
>     if (this == o) return true;
>     if (o == null || getClass() != o.getClass()) return false;
>
>     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
>
>     if (getId() != that.getId()) return false;
>     if (isStatus() != that.isStatus()) return false;
>     if (getSequence() != that.getSequence()) return false;
>     return getTime() == that.getTime();
> }
>
> @Override
> public int hashCode() {
>     //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
>     int result = getId();
>     result = 31 * result + (isStatus() ? 1 : 0);
>     result = 31 * result + getSequence();
>     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
>     return result;
> }
>
> @Override
> public String toString() {
>     return "SimpleBinaryEvent{" +
>             "id='" + id + '\'' +
>             ", status=" + status +
>             ", sequence=" + sequence +
>             ", time=" + time +
>             '}';
> }
>
> }
>
> failure cause:
>
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
>
> I am sure I have the equals() and hashCode() implemented the way it should be. I have tried the Objects.hashCode too. In other instances I have had CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), which again points to issues with references ( equality and what not ). Without checkpointing turned on it works as expected. I am running on a local cluster. Is CEP production ready ?
>
> I am using 1.3.2 Flink
>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: CEP issue in 1.3.2. Does 1.4 fix this ?

Vishal Santoshi
Thanks.  We will.

   When is 1.4.1 scheduled for release ? 

On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz <[hidden email]> wrote:
Hi Vishal,
I think it might be due to this bug: https://issues.apache.org/jira/browse/FLINK-8226
It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? Would be really helpful. If the error still persists could you file a jira?

Regards
Dawid

> On 11 Jan 2018, at 19:49, Vishal Santoshi <[hidden email]> wrote:
>
> When checkpointing is turned on a simple CEP loop pattern
>
>  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
>         .followedBy("middle").where(checkStatusOn).times(2)
>         .next("end").where(checkStatusOn).within(Time.minutes(5))
>
> I see failures.
>
> SimpleBinaryEvent is
>
> public class SimpleBinaryEvent implements Serializable {
>
> private int id;
> private int sequence;
> private boolean status;
> private long time;
>
> public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
>
> this.id
>  = id;
>     this.sequence = sequence;
>     this.status = status;
>     this.time = time;
> }
> public int getId() {
>     return id;
> }
> public int getSequence() {
>     return sequence;
> }
> public boolean isStatus() {
>     return status;
> }
> public long getTime() {
>     return time;
> }
> @Override
> public boolean equals(Object o) {
>     if (this == o) return true;
>     if (o == null || getClass() != o.getClass()) return false;
>
>     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
>
>     if (getId() != that.getId()) return false;
>     if (isStatus() != that.isStatus()) return false;
>     if (getSequence() != that.getSequence()) return false;
>     return getTime() == that.getTime();
> }
>
> @Override
> public int hashCode() {
>     //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
>     int result = getId();
>     result = 31 * result + (isStatus() ? 1 : 0);
>     result = 31 * result + getSequence();
>     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
>     return result;
> }
>
> @Override
> public String toString() {
>     return "SimpleBinaryEvent{" +
>             "id='" + id + '\'' +
>             ", status=" + status +
>             ", sequence=" + sequence +
>             ", time=" + time +
>             '}';
> }
>
> }
>
> failure cause:
>
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
>
> I am sure I have the equals() and hashCode() implemented the way it should be. I have tried the Objects.hashCode too. In other instances I have had CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), which again points to issues with references ( equality and what not ). Without checkpointing turned on it works as expected. I am running on a local cluster. Is CEP production ready ?
>
> I am using 1.3.2 Flink
>


Reply | Threaded
Open this post in threaded view
|

Re: CEP issue in 1.3.2. Does 1.4 fix this ?

Fabian Hueske-2
We don't have a schedule for bugfix releases but do them based on need.
AFAIK, a discussion about a 1.4.1 release has not been started yet.

Would you like to kick that off by sending a mail to the dev mailing list?


2018-01-12 16:41 GMT+01:00 Vishal Santoshi <[hidden email]>:
Thanks.  We will.

   When is 1.4.1 scheduled for release ? 

On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz <[hidden email]> wrote:
Hi Vishal,
I think it might be due to this bug: https://issues.apache.org/jira/browse/FLINK-8226
It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? Would be really helpful. If the error still persists could you file a jira?

Regards
Dawid

> On 11 Jan 2018, at 19:49, Vishal Santoshi <[hidden email]> wrote:
>
> When checkpointing is turned on a simple CEP loop pattern
>
>  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
>         .followedBy("middle").where(checkStatusOn).times(2)
>         .next("end").where(checkStatusOn).within(Time.minutes(5))
>
> I see failures.
>
> SimpleBinaryEvent is
>
> public class SimpleBinaryEvent implements Serializable {
>
> private int id;
> private int sequence;
> private boolean status;
> private long time;
>
> public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
>
> this.id
>  = id;
>     this.sequence = sequence;
>     this.status = status;
>     this.time = time;
> }
> public int getId() {
>     return id;
> }
> public int getSequence() {
>     return sequence;
> }
> public boolean isStatus() {
>     return status;
> }
> public long getTime() {
>     return time;
> }
> @Override
> public boolean equals(Object o) {
>     if (this == o) return true;
>     if (o == null || getClass() != o.getClass()) return false;
>
>     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
>
>     if (getId() != that.getId()) return false;
>     if (isStatus() != that.isStatus()) return false;
>     if (getSequence() != that.getSequence()) return false;
>     return getTime() == that.getTime();
> }
>
> @Override
> public int hashCode() {
>     //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
>     int result = getId();
>     result = 31 * result + (isStatus() ? 1 : 0);
>     result = 31 * result + getSequence();
>     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
>     return result;
> }
>
> @Override
> public String toString() {
>     return "SimpleBinaryEvent{" +
>             "id='" + id + '\'' +
>             ", status=" + status +
>             ", sequence=" + sequence +
>             ", time=" + time +
>             '}';
> }
>
> }
>
> failure cause:
>
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
>
> I am sure I have the equals() and hashCode() implemented the way it should be. I have tried the Objects.hashCode too. In other instances I have had CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), which again points to issues with references ( equality and what not ). Without checkpointing turned on it works as expected. I am running on a local cluster. Is CEP production ready ?
>
> I am using 1.3.2 Flink
>



Reply | Threaded
Open this post in threaded view
|

Re: CEP issue in 1.3.2. Does 1.4 fix this ?

Vishal Santoshi
Will do.

On Sun, Jan 14, 2018 at 10:23 AM, Fabian Hueske <[hidden email]> wrote:
We don't have a schedule for bugfix releases but do them based on need.
AFAIK, a discussion about a 1.4.1 release has not been started yet.

Would you like to kick that off by sending a mail to the dev mailing list?


2018-01-12 16:41 GMT+01:00 Vishal Santoshi <[hidden email]>:
Thanks.  We will.

   When is 1.4.1 scheduled for release ? 

On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz <[hidden email]> wrote:
Hi Vishal,
I think it might be due to this bug: https://issues.apache.org/jira/browse/FLINK-8226
It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? Would be really helpful. If the error still persists could you file a jira?

Regards
Dawid

> On 11 Jan 2018, at 19:49, Vishal Santoshi <[hidden email]> wrote:
>
> When checkpointing is turned on a simple CEP loop pattern
>
>  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
>         .followedBy("middle").where(checkStatusOn).times(2)
>         .next("end").where(checkStatusOn).within(Time.minutes(5))
>
> I see failures.
>
> SimpleBinaryEvent is
>
> public class SimpleBinaryEvent implements Serializable {
>
> private int id;
> private int sequence;
> private boolean status;
> private long time;
>
> public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
>
> this.id
>  = id;
>     this.sequence = sequence;
>     this.status = status;
>     this.time = time;
> }
> public int getId() {
>     return id;
> }
> public int getSequence() {
>     return sequence;
> }
> public boolean isStatus() {
>     return status;
> }
> public long getTime() {
>     return time;
> }
> @Override
> public boolean equals(Object o) {
>     if (this == o) return true;
>     if (o == null || getClass() != o.getClass()) return false;
>
>     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
>
>     if (getId() != that.getId()) return false;
>     if (isStatus() != that.isStatus()) return false;
>     if (getSequence() != that.getSequence()) return false;
>     return getTime() == that.getTime();
> }
>
> @Override
> public int hashCode() {
>     //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
>     int result = getId();
>     result = 31 * result + (isStatus() ? 1 : 0);
>     result = 31 * result + getSequence();
>     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
>     return result;
> }
>
> @Override
> public String toString() {
>     return "SimpleBinaryEvent{" +
>             "id='" + id + '\'' +
>             ", status=" + status +
>             ", sequence=" + sequence +
>             ", time=" + time +
>             '}';
> }
>
> }
>
> failure cause:
>
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
>
> I am sure I have the equals() and hashCode() implemented the way it should be. I have tried the Objects.hashCode too. In other instances I have had CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), which again points to issues with references ( equality and what not ). Without checkpointing turned on it works as expected. I am running on a local cluster. Is CEP production ready ?
>
> I am using 1.3.2 Flink
>




Reply | Threaded
Open this post in threaded view
|

Re: CEP issue in 1.3.2. Does 1.4 fix this ?

Vishal Santoshi
Have tested against the 1.5 SNAPShot ( I simply pulled  the source code into my distribution and compiled it into my job jar ). Both the test code and the cluster seems to work ok. Have not tested the "savepoint  and resume" mode but restore from checkpoint works. I brought the JM down and restarted it.  I have to sanitize the output but at least the exception is not thrown.

One thing though and please confirm

In CEP it seems that a POJO pushed into the window as part of Pattern match has to have an  "exact" equals/hashcode.  As in in my case I had a custom equals/hashcode for enabling "contains" for a different context as in I had deliberately not included an instance variable in the equals/hashcode  contract. Is that a design decision or a requirement in CEP ? 

Thanks and Regards.




On Sun, Jan 14, 2018 at 12:27 PM, Vishal Santoshi <[hidden email]> wrote:
Will do.

On Sun, Jan 14, 2018 at 10:23 AM, Fabian Hueske <[hidden email]> wrote:
We don't have a schedule for bugfix releases but do them based on need.
AFAIK, a discussion about a 1.4.1 release has not been started yet.

Would you like to kick that off by sending a mail to the dev mailing list?


2018-01-12 16:41 GMT+01:00 Vishal Santoshi <[hidden email]>:
Thanks.  We will.

   When is 1.4.1 scheduled for release ? 

On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz <[hidden email]> wrote:
Hi Vishal,
I think it might be due to this bug: https://issues.apache.org/jira/browse/FLINK-8226
It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? Would be really helpful. If the error still persists could you file a jira?

Regards
Dawid

> On 11 Jan 2018, at 19:49, Vishal Santoshi <[hidden email]> wrote:
>
> When checkpointing is turned on a simple CEP loop pattern
>
>  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
>         .followedBy("middle").where(checkStatusOn).times(2)
>         .next("end").where(checkStatusOn).within(Time.minutes(5))
>
> I see failures.
>
> SimpleBinaryEvent is
>
> public class SimpleBinaryEvent implements Serializable {
>
> private int id;
> private int sequence;
> private boolean status;
> private long time;
>
> public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
>
> this.id
>  = id;
>     this.sequence = sequence;
>     this.status = status;
>     this.time = time;
> }
> public int getId() {
>     return id;
> }
> public int getSequence() {
>     return sequence;
> }
> public boolean isStatus() {
>     return status;
> }
> public long getTime() {
>     return time;
> }
> @Override
> public boolean equals(Object o) {
>     if (this == o) return true;
>     if (o == null || getClass() != o.getClass()) return false;
>
>     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
>
>     if (getId() != that.getId()) return false;
>     if (isStatus() != that.isStatus()) return false;
>     if (getSequence() != that.getSequence()) return false;
>     return getTime() == that.getTime();
> }
>
> @Override
> public int hashCode() {
>     //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
>     int result = getId();
>     result = 31 * result + (isStatus() ? 1 : 0);
>     result = 31 * result + getSequence();
>     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
>     return result;
> }
>
> @Override
> public String toString() {
>     return "SimpleBinaryEvent{" +
>             "id='" + id + '\'' +
>             ", status=" + status +
>             ", sequence=" + sequence +
>             ", time=" + time +
>             '}';
> }
>
> }
>
> failure cause:
>
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
>
> I am sure I have the equals() and hashCode() implemented the way it should be. I have tried the Objects.hashCode too. In other instances I have had CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), which again points to issues with references ( equality and what not ). Without checkpointing turned on it works as expected. I am running on a local cluster. Is CEP production ready ?
>
> I am using 1.3.2 Flink
>





Reply | Threaded
Open this post in threaded view
|

Re: CEP issue in 1.3.2. Does 1.4 fix this ?

Kostas Kloudas
Hi Vishal,

Thanks for checking and glad to hear that your job works after the fix!

As for the equals/hashcode question, if your question is if you have to implement exact equals() method and the corresponding hashcode()
then the answer is yes. These methods are used when retrieving and cleaning up “outdated” data from FlinkCEP’s internal datastructures.
As a consequence, ambiguous implementations can lead to the wrong elements being cleaned up.

Thanks, 
Kostas

On Jan 21, 2018, at 3:32 PM, Vishal Santoshi <[hidden email]> wrote:

Have tested against the 1.5 SNAPShot ( I simply pulled  the source code into my distribution and compiled it into my job jar ). Both the test code and the cluster seems to work ok. Have not tested the "savepoint  and resume" mode but restore from checkpoint works. I brought the JM down and restarted it.  I have to sanitize the output but at least the exception is not thrown.

One thing though and please confirm

In CEP it seems that a POJO pushed into the window as part of Pattern match has to have an  "exact" equals/hashcode.  As in in my case I had a custom equals/hashcode for enabling "contains" for a different context as in I had deliberately not included an instance variable in the equals/hashcode  contract. Is that a design decision or a requirement in CEP ? 

Thanks and Regards.




On Sun, Jan 14, 2018 at 12:27 PM, Vishal Santoshi <[hidden email]> wrote:
Will do.

On Sun, Jan 14, 2018 at 10:23 AM, Fabian Hueske <[hidden email]> wrote:
We don't have a schedule for bugfix releases but do them based on need.
AFAIK, a discussion about a 1.4.1 release has not been started yet.

Would you like to kick that off by sending a mail to the dev mailing list?


2018-01-12 16:41 GMT+01:00 Vishal Santoshi <[hidden email]>:
Thanks.  We will.

   When is 1.4.1 scheduled for release ? 

On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz <[hidden email]> wrote:
Hi Vishal,
I think it might be due to this bug: https://issues.apache.org/jira/browse/FLINK-8226
It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? Would be really helpful. If the error still persists could you file a jira?

Regards
Dawid

> On 11 Jan 2018, at 19:49, Vishal Santoshi <[hidden email]> wrote:
>
> When checkpointing is turned on a simple CEP loop pattern
>
>  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
>         .followedBy("middle").where(checkStatusOn).times(2)
>         .next("end").where(checkStatusOn).within(Time.minutes(5))
>
> I see failures.
>
> SimpleBinaryEvent is
>
> public class SimpleBinaryEvent implements Serializable {
>
> private int id;
> private int sequence;
> private boolean status;
> private long time;
>
> public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
>
> this.id
>  = id;
>     this.sequence = sequence;
>     this.status = status;
>     this.time = time;
> }
> public int getId() {
>     return id;
> }
> public int getSequence() {
>     return sequence;
> }
> public boolean isStatus() {
>     return status;
> }
> public long getTime() {
>     return time;
> }
> @Override
> public boolean equals(Object o) {
>     if (this == o) return true;
>     if (o == null || getClass() != o.getClass()) return false;
>
>     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
>
>     if (getId() != that.getId()) return false;
>     if (isStatus() != that.isStatus()) return false;
>     if (getSequence() != that.getSequence()) return false;
>     return getTime() == that.getTime();
> }
>
> @Override
> public int hashCode() {
>     //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
>     int result = getId();
>     result = 31 * result + (isStatus() ? 1 : 0);
>     result = 31 * result + getSequence();
>     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
>     return result;
> }
>
> @Override
> public String toString() {
>     return "SimpleBinaryEvent{" +
>             "id='" + id + '\'' +
>             ", status=" + status +
>             ", sequence=" + sequence +
>             ", time=" + time +
>             '}';
> }
>
> }
>
> failure cause:
>
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
>
> I am sure I have the equals() and hashCode() implemented the way it should be. I have tried the Objects.hashCode too. In other instances I have had CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), which again points to issues with references ( equality and what not ). Without checkpointing turned on it works as expected. I am running on a local cluster. Is CEP production ready ?
>
> I am using 1.3.2 Flink
>






Reply | Threaded
Open this post in threaded view
|

Re: CEP issue in 1.3.2. Does 1.4 fix this ?

Vishal Santoshi
Thanks. Confirmed through tests the above behavior.

On Tue, Jan 23, 2018 at 4:09 AM, Kostas Kloudas <[hidden email]> wrote:
Hi Vishal,

Thanks for checking and glad to hear that your job works after the fix!

As for the equals/hashcode question, if your question is if you have to implement exact equals() method and the corresponding hashcode()
then the answer is yes. These methods are used when retrieving and cleaning up “outdated” data from FlinkCEP’s internal datastructures.
As a consequence, ambiguous implementations can lead to the wrong elements being cleaned up.

Thanks, 
Kostas

On Jan 21, 2018, at 3:32 PM, Vishal Santoshi <[hidden email]> wrote:

Have tested against the 1.5 SNAPShot ( I simply pulled  the source code into my distribution and compiled it into my job jar ). Both the test code and the cluster seems to work ok. Have not tested the "savepoint  and resume" mode but restore from checkpoint works. I brought the JM down and restarted it.  I have to sanitize the output but at least the exception is not thrown.

One thing though and please confirm

In CEP it seems that a POJO pushed into the window as part of Pattern match has to have an  "exact" equals/hashcode.  As in in my case I had a custom equals/hashcode for enabling "contains" for a different context as in I had deliberately not included an instance variable in the equals/hashcode  contract. Is that a design decision or a requirement in CEP ? 

Thanks and Regards.




On Sun, Jan 14, 2018 at 12:27 PM, Vishal Santoshi <[hidden email]> wrote:
Will do.

On Sun, Jan 14, 2018 at 10:23 AM, Fabian Hueske <[hidden email]> wrote:
We don't have a schedule for bugfix releases but do them based on need.
AFAIK, a discussion about a 1.4.1 release has not been started yet.

Would you like to kick that off by sending a mail to the dev mailing list?


2018-01-12 16:41 GMT+01:00 Vishal Santoshi <[hidden email]>:
Thanks.  We will.

   When is 1.4.1 scheduled for release ? 

On Fri, Jan 12, 2018 at 3:24 AM, Dawid Wysakowicz <[hidden email]> wrote:
Hi Vishal,
I think it might be due to this bug: https://issues.apache.org/jira/browse/FLINK-8226
It was merged for 1.4.1 and 1.5.0. Could you check with this changes applied? Would be really helpful. If the error still persists could you file a jira?

Regards
Dawid

> On 11 Jan 2018, at 19:49, Vishal Santoshi <[hidden email]> wrote:
>
> When checkpointing is turned on a simple CEP loop pattern
>
>  private Pattern<Tuple2<Integer, SimpleBinaryEvent>, ?> alertPattern = Pattern.<Tuple2<Integer, SimpleBinaryEvent>>begin("start").where(checkStatusOn)
>         .followedBy("middle").where(checkStatusOn).times(2)
>         .next("end").where(checkStatusOn).within(Time.minutes(5))
>
> I see failures.
>
> SimpleBinaryEvent is
>
> public class SimpleBinaryEvent implements Serializable {
>
> private int id;
> private int sequence;
> private boolean status;
> private long time;
>
> public SimpleBinaryEvent(int id, int sequence, boolean status , long time) {
>
> this.id
>  = id;
>     this.sequence = sequence;
>     this.status = status;
>     this.time = time;
> }
> public int getId() {
>     return id;
> }
> public int getSequence() {
>     return sequence;
> }
> public boolean isStatus() {
>     return status;
> }
> public long getTime() {
>     return time;
> }
> @Override
> public boolean equals(Object o) {
>     if (this == o) return true;
>     if (o == null || getClass() != o.getClass()) return false;
>
>     SimpleBinaryEvent that = (SimpleBinaryEvent) o;
>
>     if (getId() != that.getId()) return false;
>     if (isStatus() != that.isStatus()) return false;
>     if (getSequence() != that.getSequence()) return false;
>     return getTime() == that.getTime();
> }
>
> @Override
> public int hashCode() {
>     //return Objects.hash(getId(),isStatus(), getSequence(),getTime());
>     int result = getId();
>     result = 31 * result + (isStatus() ? 1 : 0);
>     result = 31 * result + getSequence();
>     result = 31 * result + (int) (getTime() ^ (getTime() >>> 32));
>     return result;
> }
>
> @Override
> public String toString() {
>     return "SimpleBinaryEvent{" +
>             "id='" + id + '\'' +
>             ", status=" + status +
>             ", sequence=" + sequence +
>             ", time=" + time +
>             '}';
> }
>
> }
>
> failure cause:
>
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for operator KeyedCEPPatternOperator -> Map (1/1).
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper((1,SimpleBinaryEvent{id='1', status=true, sequence=95, time=1505503380000}), 1505503380000, 0),....
>
> I am sure I have the equals() and hashCode() implemented the way it should be. I have tried the Objects.hashCode too. In other instances I have had CircularReference ( and thus stackOverflow ) on SharedBuffer.toString(), which again points to issues with references ( equality and what not ). Without checkpointing turned on it works as expected. I am running on a local cluster. Is CEP production ready ?
>
> I am using 1.3.2 Flink
>