Keyed CEP checkpoint fails

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Keyed CEP checkpoint fails

Daiqing Li
Hi Flink user,

I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now. 
2017-08-09 18:15:04,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88d4) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
	... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
	... 5 more
	Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
		at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
		at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
		... 5 more
Reply | Threaded
Open this post in threaded view
|

Re: Keyed CEP checkpoint fails

Dawid Wysakowicz
As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info.

Regards,
Dawid

> On 10 Aug 2017, at 14:10, Daiqing Li <[hidden email]> wrote:
>
> Hi Flink user,
>
> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now.
> 2017-08-09 18:15:04,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88
> d4) switched from RUNNING to FAILED.
> AsynchronousException{java.
> lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
> at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:970)
> at java.util.concurrent.
> Executors$RunnableAdapter.
> call(Executors.java:511)
> at java.util.concurrent.
> FutureTask.run(FutureTask.
> java:266)
> at java.util.concurrent.
> ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
> at java.util.concurrent.
> ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.
> java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
> ... 6 more
> Caused by: java.util.concurrent.
> ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(
> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
> at java.util.concurrent.
> FutureTask.report(FutureTask.
> java:122)
> at java.util.concurrent.
> FutureTask.get(FutureTask.
> java:192)
> at org.apache.flink.util.
> FutureUtil.runIfNotDoneAndGet(
> FutureUtil.java:43)
> at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:897)
> ... 5 more
> Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
> at org.apache.flink.streaming.
> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
> 90)
> at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.
> cleanup(StreamTask.java:1023)
> at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:961)
> ... 5 more
>


signature.asc (817 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Keyed CEP checkpoint fails

Daiqing Li
Hi,

Here is the code. But I am not sure if you can reproduce the problem without data source.

Best,
Daiqing

On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz <[hidden email]> wrote:
As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info.

Regards,
Dawid

> On 10 Aug 2017, at 14:10, Daiqing Li <[hidden email]> wrote:
>
> Hi Flink user,
>
> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now.
> 2017-08-09 18:15:04,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88
> d4) switched from RUNNING to FAILED.
> AsynchronousException{java.
> lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
>       at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:970)
>       at java.util.concurrent.
> Executors$RunnableAdapter.
> call(Executors.java:511)
>       at java.util.concurrent.
> FutureTask.run(FutureTask.
> java:266)
>       at java.util.concurrent.
> ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1149)
>       at java.util.concurrent.
> ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.
> java:748)
> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
>       ... 6 more
> Caused by: java.util.concurrent.
> ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(
> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
>       at java.util.concurrent.
> FutureTask.report(FutureTask.
> java:122)
>       at java.util.concurrent.
> FutureTask.get(FutureTask.
> java:192)
>       at org.apache.flink.util.
> FutureUtil.runIfNotDoneAndGet(
> FutureUtil.java:43)
>       at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:897)
>       ... 5 more
>       Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
>               at org.apache.flink.streaming.
> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
> 90)
>               at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.
> cleanup(StreamTask.java:1023)
>               at org.apache.flink.streaming.
> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> StreamTask.java:961)
>               ... 5 more
>



MilestoneEvent.java (1K) Download Attachment
example.java (11K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Keyed CEP checkpoint fails

Dawid Wysakowicz
You are right, I won’t be able to reproduce this problem without data. One thing I can tell though that I think the problem is indeed with the hashcode. Unforunately I don’t know Gson, but one strange thing I noticed is the exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which seems odd as if your event was empty.

Generally speaking as I understand this Exception is thrown because the hashcode of your event changes during serialization, and access to some internal temporary cache is broken.

> On 10 Aug 2017, at 14:29, Daiqing Li <[hidden email]> wrote:
>
> Hi,
>
> Here is the code. But I am not sure if you can reproduce the problem without data source.
>
> Best,
> Daiqing
>
> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz <[hidden email]> wrote:
> As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info.
>
> Regards,
> Dawid
>
> > On 10 Aug 2017, at 14:10, Daiqing Li <[hidden email]> wrote:
> >
> > Hi Flink user,
> >
> > I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now.
> > 2017-08-09 18:15:04,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88
> > d4) switched from RUNNING to FAILED.
> > AsynchronousException{java.
> > lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
> >       at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> > StreamTask.java:970)
> >       at java.util.concurrent.
> > Executors$RunnableAdapter.
> > call(Executors.java:511)
> >       at java.util.concurrent.
> > FutureTask.run(FutureTask.
> > java:266)
> >       at java.util.concurrent.
> > ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1149)
> >       at java.util.concurrent.
> > ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:624)
> >       at java.lang.Thread.run(Thread.
> > java:748)
> > Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
> >       ... 6 more
> > Caused by: java.util.concurrent.
> > ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(
> > ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
> >       at java.util.concurrent.
> > FutureTask.report(FutureTask.
> > java:122)
> >       at java.util.concurrent.
> > FutureTask.get(FutureTask.
> > java:192)
> >       at org.apache.flink.util.
> > FutureUtil.runIfNotDoneAndGet(
> > FutureUtil.java:43)
> >       at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> > StreamTask.java:897)
> >       ... 5 more
> >       Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
> >               at org.apache.flink.streaming.
> > api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
> > 90)
> >               at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.
> > cleanup(StreamTask.java:1023)
> >               at org.apache.flink.streaming.
> > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
> > StreamTask.java:961)
> >               ... 5 more
> >
>
>
> <MilestoneEvent.java><example.java>


signature.asc (817 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Keyed CEP checkpoint fails

Daiqing Li
Oh sorry, the data in {} is not empty because I hide private information about my model. Do you have that same conclusion?

> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz <[hidden email]> wrote:
>
> You are right, I won’t be able to reproduce this problem without data. One thing I can tell though that I think the problem is indeed with the hashcode. Unforunately I don’t know Gson, but one strange thing I noticed is the exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which seems odd as if your event was empty.
>
> Generally speaking as I understand this Exception is thrown because the hashcode of your event changes during serialization, and access to some internal temporary cache is broken.
>
>> On 10 Aug 2017, at 14:29, Daiqing Li <[hidden email]> wrote:
>>
>> Hi,
>>
>> Here is the code. But I am not sure if you can reproduce the problem without data source.
>>
>> Best,
>> Daiqing
>>
>> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz <[hidden email]> wrote:
>> As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info.
>>
>> Regards,
>> Dawid
>>
>>> On 10 Aug 2017, at 14:10, Daiqing Li <[hidden email]> wrote:
>>>
>>> Hi Flink user,
>>>
>>> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now.
>>> 2017-08-09 18:15:04,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88
>>> d4) switched from RUNNING to FAILED.
>>> AsynchronousException{java.
>>> lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
>>>      at org.apache.flink.streaming.
>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>> StreamTask.java:970)
>>>      at java.util.concurrent.
>>> Executors$RunnableAdapter.
>>> call(Executors.java:511)
>>>      at java.util.concurrent.
>>> FutureTask.run(FutureTask.
>>> java:266)
>>>      at java.util.concurrent.
>>> ThreadPoolExecutor.runWorker(
>>> ThreadPoolExecutor.java:1149)
>>>      at java.util.concurrent.
>>> ThreadPoolExecutor$Worker.run(
>>> ThreadPoolExecutor.java:624)
>>>      at java.lang.Thread.run(Thread.
>>> java:748)
>>> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
>>>      ... 6 more
>>> Caused by: java.util.concurrent.
>>> ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(
>>> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
>>>      at java.util.concurrent.
>>> FutureTask.report(FutureTask.
>>> java:122)
>>>      at java.util.concurrent.
>>> FutureTask.get(FutureTask.
>>> java:192)
>>>      at org.apache.flink.util.
>>> FutureUtil.runIfNotDoneAndGet(
>>> FutureUtil.java:43)
>>>      at org.apache.flink.streaming.
>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>> StreamTask.java:897)
>>>      ... 5 more
>>>      Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
>>>              at org.apache.flink.streaming.
>>> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
>>> 90)
>>>              at org.apache.flink.streaming.
>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.
>>> cleanup(StreamTask.java:1023)
>>>              at org.apache.flink.streaming.
>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>> StreamTask.java:961)
>>>              ... 5 more
>>>
>>
>>
>> <MilestoneEvent.java><example.java>
>

Reply | Threaded
Open this post in threaded view
|

Re: Keyed CEP checkpoint fails

Dawid Wysakowicz
Yes, with the information I have, the conclusion would be the same, that I think the reason is problem with hashcode. Without some data to reproduce it unfortunately I won’t be able to help you further. I could just advise you to debug the method SharedBuffer#serialize and pay attention to the entryID map.

> On 10 Aug 2017, at 14:54, Daiqing Li <[hidden email]> wrote:
>
> Oh sorry, the data in {} is not empty because I hide private information about my model. Do you have that same conclusion?
>> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz <[hidden email]> wrote:
>>
>> You are right, I won’t be able to reproduce this problem without data. One thing I can tell though that I think the problem is indeed with the hashcode. Unforunately I don’t know Gson, but one strange thing I noticed is the exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which seems odd as if your event was empty.
>>
>> Generally speaking as I understand this Exception is thrown because the hashcode of your event changes during serialization, and access to some internal temporary cache is broken.
>>
>>> On 10 Aug 2017, at 14:29, Daiqing Li <[hidden email]> wrote:
>>>
>>> Hi,
>>>
>>> Here is the code. But I am not sure if you can reproduce the problem without data source.
>>>
>>> Best,
>>> Daiqing
>>>
>>> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz <[hidden email]> wrote:
>>> As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info.
>>>
>>> Regards,
>>> Dawid
>>>
>>>> On 10 Aug 2017, at 14:10, Daiqing Li <[hidden email]> wrote:
>>>>
>>>> Hi Flink user,
>>>>
>>>> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now.
>>>> 2017-08-09 18:15:04,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88
>>>> d4) switched from RUNNING to FAILED.
>>>> AsynchronousException{java.
>>>> lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
>>>>     at org.apache.flink.streaming.
>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>> StreamTask.java:970)
>>>>     at java.util.concurrent.
>>>> Executors$RunnableAdapter.
>>>> call(Executors.java:511)
>>>>     at java.util.concurrent.
>>>> FutureTask.run(FutureTask.
>>>> java:266)
>>>>     at java.util.concurrent.
>>>> ThreadPoolExecutor.runWorker(
>>>> ThreadPoolExecutor.java:1149)
>>>>     at java.util.concurrent.
>>>> ThreadPoolExecutor$Worker.run(
>>>> ThreadPoolExecutor.java:624)
>>>>     at java.lang.Thread.run(Thread.
>>>> java:748)
>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
>>>>     ... 6 more
>>>> Caused by: java.util.concurrent.
>>>> ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(
>>>> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
>>>>     at java.util.concurrent.
>>>> FutureTask.report(FutureTask.
>>>> java:122)
>>>>     at java.util.concurrent.
>>>> FutureTask.get(FutureTask.
>>>> java:192)
>>>>     at org.apache.flink.util.
>>>> FutureUtil.runIfNotDoneAndGet(
>>>> FutureUtil.java:43)
>>>>     at org.apache.flink.streaming.
>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>> StreamTask.java:897)
>>>>     ... 5 more
>>>>     Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
>>>>             at org.apache.flink.streaming.
>>>> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
>>>> 90)
>>>>             at org.apache.flink.streaming.
>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.
>>>> cleanup(StreamTask.java:1023)
>>>>             at org.apache.flink.streaming.
>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>> StreamTask.java:961)
>>>>             ... 5 more
>>>>
>>>
>>>
>>> <MilestoneEvent.java><example.java>
>>
>


signature.asc (817 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Keyed CEP checkpoint fails

Daiqing Li
Hi Dawid,

After rewriting dashcode with Objects.hash for all the fields, I still get the same error. One thing special is checkpoints always fail at 428, after trying many times. Does it mean anything?

> On Aug 10, 2017, at 9:14 AM, Dawid Wysakowicz <[hidden email]> wrote:
>
> Yes, with the information I have, the conclusion would be the same, that I think the reason is problem with hashcode. Without some data to reproduce it unfortunately I won’t be able to help you further. I could just advise you to debug the method SharedBuffer#serialize and pay attention to the entryID map.
>
>> On 10 Aug 2017, at 14:54, Daiqing Li <[hidden email]> wrote:
>>
>> Oh sorry, the data in {} is not empty because I hide private information about my model. Do you have that same conclusion?
>>> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz <[hidden email]> wrote:
>>>
>>> You are right, I won’t be able to reproduce this problem without data. One thing I can tell though that I think the problem is indeed with the hashcode. Unforunately I don’t know Gson, but one strange thing I noticed is the exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which seems odd as if your event was empty.
>>>
>>> Generally speaking as I understand this Exception is thrown because the hashcode of your event changes during serialization, and access to some internal temporary cache is broken.
>>>
>>>> On 10 Aug 2017, at 14:29, Daiqing Li <[hidden email]> wrote:
>>>>
>>>> Hi,
>>>>
>>>> Here is the code. But I am not sure if you can reproduce the problem without data source.
>>>>
>>>> Best,
>>>> Daiqing
>>>>
>>>> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz <[hidden email]> wrote:
>>>> As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info.
>>>>
>>>> Regards,
>>>> Dawid
>>>>
>>>>> On 10 Aug 2017, at 14:10, Daiqing Li <[hidden email]> wrote:
>>>>>
>>>>> Hi Flink user,
>>>>>
>>>>> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now.
>>>>> 2017-08-09 18:15:04,572 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88
>>>>> d4) switched from RUNNING to FAILED.
>>>>> AsynchronousException{java.
>>>>> lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).}
>>>>>    at org.apache.flink.streaming.
>>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>>> StreamTask.java:970)
>>>>>    at java.util.concurrent.
>>>>> Executors$RunnableAdapter.
>>>>> call(Executors.java:511)
>>>>>    at java.util.concurrent.
>>>>> FutureTask.run(FutureTask.
>>>>> java:266)
>>>>>    at java.util.concurrent.
>>>>> ThreadPoolExecutor.runWorker(
>>>>> ThreadPoolExecutor.java:1149)
>>>>>    at java.util.concurrent.
>>>>> ThreadPoolExecutor$Worker.run(
>>>>> ThreadPoolExecutor.java:624)
>>>>>    at java.lang.Thread.run(Thread.
>>>>> java:748)
>>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).
>>>>>    ... 6 more
>>>>> Caused by: java.util.concurrent.
>>>>> ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(
>>>>> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1)
>>>>>    at java.util.concurrent.
>>>>> FutureTask.report(FutureTask.
>>>>> java:122)
>>>>>    at java.util.concurrent.
>>>>> FutureTask.get(FutureTask.
>>>>> java:192)
>>>>>    at org.apache.flink.util.
>>>>> FutureUtil.runIfNotDoneAndGet(
>>>>> FutureUtil.java:43)
>>>>>    at org.apache.flink.streaming.
>>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>>> StreamTask.java:897)
>>>>>    ... 5 more
>>>>>    Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future.
>>>>>            at org.apache.flink.streaming.
>>>>> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:
>>>>> 90)
>>>>>            at org.apache.flink.streaming.
>>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.
>>>>> cleanup(StreamTask.java:1023)
>>>>>            at org.apache.flink.streaming.
>>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(
>>>>> StreamTask.java:961)
>>>>>            ... 5 more
>>>>>
>>>>
>>>>
>>>> <MilestoneEvent.java><example.java>
>>>
>>
>