Hi Flink user,
I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now. 2017-08-09 18:15:04,572 INFO org.apache.flink.runtime. |
As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info.
Regards, Dawid > On 10 Aug 2017, at 14:10, Daiqing Li <[hidden email]> wrote: > > Hi Flink user, > > I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now. > 2017-08-09 18:15:04,572 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88 > d4) switched from RUNNING to FAILED. > AsynchronousException{java. > lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} > at org.apache.flink.streaming. > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > StreamTask.java:970) > at java.util.concurrent. > Executors$RunnableAdapter. > call(Executors.java:511) > at java.util.concurrent. > FutureTask.run(FutureTask. > java:266) > at java.util.concurrent. > ThreadPoolExecutor.runWorker( > ThreadPoolExecutor.java:1149) > at java.util.concurrent. > ThreadPoolExecutor$Worker.run( > ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread. > java:748) > Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). > ... 6 more > Caused by: java.util.concurrent. > ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry( > ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) > at java.util.concurrent. > FutureTask.report(FutureTask. > java:122) > at java.util.concurrent. > FutureTask.get(FutureTask. > java:192) > at org.apache.flink.util. > FutureUtil.runIfNotDoneAndGet( > FutureUtil.java:43) > at org.apache.flink.streaming. > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > StreamTask.java:897) > ... 5 more > Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. > at org.apache.flink.streaming. > api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java: > 90) > at org.apache.flink.streaming. > runtime.tasks.StreamTask$AsyncCheckpointRunnable. > cleanup(StreamTask.java:1023) > at org.apache.flink.streaming. > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > StreamTask.java:961) > ... 5 more > signature.asc (817 bytes) Download Attachment |
Hi, Here is the code. But I am not sure if you can reproduce the problem without data source. Best, Daiqing On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz <[hidden email]> wrote: As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info. |
You are right, I won’t be able to reproduce this problem without data. One thing I can tell though that I think the problem is indeed with the hashcode. Unforunately I don’t know Gson, but one strange thing I noticed is the exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which seems odd as if your event was empty.
Generally speaking as I understand this Exception is thrown because the hashcode of your event changes during serialization, and access to some internal temporary cache is broken. > On 10 Aug 2017, at 14:29, Daiqing Li <[hidden email]> wrote: > > Hi, > > Here is the code. But I am not sure if you can reproduce the problem without data source. > > Best, > Daiqing > > On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz <[hidden email]> wrote: > As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info. > > Regards, > Dawid > > > On 10 Aug 2017, at 14:10, Daiqing Li <[hidden email]> wrote: > > > > Hi Flink user, > > > > I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now. > > 2017-08-09 18:15:04,572 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88 > > d4) switched from RUNNING to FAILED. > > AsynchronousException{java. > > lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > > StreamTask.java:970) > > at java.util.concurrent. > > Executors$RunnableAdapter. > > call(Executors.java:511) > > at java.util.concurrent. > > FutureTask.run(FutureTask. > > java:266) > > at java.util.concurrent. > > ThreadPoolExecutor.runWorker( > > ThreadPoolExecutor.java:1149) > > at java.util.concurrent. > > ThreadPoolExecutor$Worker.run( > > ThreadPoolExecutor.java:624) > > at java.lang.Thread.run(Thread. > > java:748) > > Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). > > ... 6 more > > Caused by: java.util.concurrent. > > ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry( > > ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) > > at java.util.concurrent. > > FutureTask.report(FutureTask. > > java:122) > > at java.util.concurrent. > > FutureTask.get(FutureTask. > > java:192) > > at org.apache.flink.util. > > FutureUtil.runIfNotDoneAndGet( > > FutureUtil.java:43) > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > > StreamTask.java:897) > > ... 5 more > > Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. > > at org.apache.flink.streaming. > > api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java: > > 90) > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable. > > cleanup(StreamTask.java:1023) > > at org.apache.flink.streaming. > > runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( > > StreamTask.java:961) > > ... 5 more > > > > > <MilestoneEvent.java><example.java> signature.asc (817 bytes) Download Attachment |
Oh sorry, the data in {} is not empty because I hide private information about my model. Do you have that same conclusion?
> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz <[hidden email]> wrote: > > You are right, I won’t be able to reproduce this problem without data. One thing I can tell though that I think the problem is indeed with the hashcode. Unforunately I don’t know Gson, but one strange thing I noticed is the exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which seems odd as if your event was empty. > > Generally speaking as I understand this Exception is thrown because the hashcode of your event changes during serialization, and access to some internal temporary cache is broken. > >> On 10 Aug 2017, at 14:29, Daiqing Li <[hidden email]> wrote: >> >> Hi, >> >> Here is the code. But I am not sure if you can reproduce the problem without data source. >> >> Best, >> Daiqing >> >> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz <[hidden email]> wrote: >> As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info. >> >> Regards, >> Dawid >> >>> On 10 Aug 2017, at 14:10, Daiqing Li <[hidden email]> wrote: >>> >>> Hi Flink user, >>> >>> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now. >>> 2017-08-09 18:15:04,572 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88 >>> d4) switched from RUNNING to FAILED. >>> AsynchronousException{java. >>> lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} >>> at org.apache.flink.streaming. >>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>> StreamTask.java:970) >>> at java.util.concurrent. >>> Executors$RunnableAdapter. >>> call(Executors.java:511) >>> at java.util.concurrent. >>> FutureTask.run(FutureTask. >>> java:266) >>> at java.util.concurrent. >>> ThreadPoolExecutor.runWorker( >>> ThreadPoolExecutor.java:1149) >>> at java.util.concurrent. >>> ThreadPoolExecutor$Worker.run( >>> ThreadPoolExecutor.java:624) >>> at java.lang.Thread.run(Thread. >>> java:748) >>> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). >>> ... 6 more >>> Caused by: java.util.concurrent. >>> ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry( >>> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) >>> at java.util.concurrent. >>> FutureTask.report(FutureTask. >>> java:122) >>> at java.util.concurrent. >>> FutureTask.get(FutureTask. >>> java:192) >>> at org.apache.flink.util. >>> FutureUtil.runIfNotDoneAndGet( >>> FutureUtil.java:43) >>> at org.apache.flink.streaming. >>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>> StreamTask.java:897) >>> ... 5 more >>> Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. >>> at org.apache.flink.streaming. >>> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java: >>> 90) >>> at org.apache.flink.streaming. >>> runtime.tasks.StreamTask$AsyncCheckpointRunnable. >>> cleanup(StreamTask.java:1023) >>> at org.apache.flink.streaming. >>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>> StreamTask.java:961) >>> ... 5 more >>> >> >> >> <MilestoneEvent.java><example.java> > |
Yes, with the information I have, the conclusion would be the same, that I think the reason is problem with hashcode. Without some data to reproduce it unfortunately I won’t be able to help you further. I could just advise you to debug the method SharedBuffer#serialize and pay attention to the entryID map.
> On 10 Aug 2017, at 14:54, Daiqing Li <[hidden email]> wrote: > > Oh sorry, the data in {} is not empty because I hide private information about my model. Do you have that same conclusion? >> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz <[hidden email]> wrote: >> >> You are right, I won’t be able to reproduce this problem without data. One thing I can tell though that I think the problem is indeed with the hashcode. Unforunately I don’t know Gson, but one strange thing I noticed is the exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which seems odd as if your event was empty. >> >> Generally speaking as I understand this Exception is thrown because the hashcode of your event changes during serialization, and access to some internal temporary cache is broken. >> >>> On 10 Aug 2017, at 14:29, Daiqing Li <[hidden email]> wrote: >>> >>> Hi, >>> >>> Here is the code. But I am not sure if you can reproduce the problem without data source. >>> >>> Best, >>> Daiqing >>> >>> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz <[hidden email]> wrote: >>> As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info. >>> >>> Regards, >>> Dawid >>> >>>> On 10 Aug 2017, at 14:10, Daiqing Li <[hidden email]> wrote: >>>> >>>> Hi Flink user, >>>> >>>> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now. >>>> 2017-08-09 18:15:04,572 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88 >>>> d4) switched from RUNNING to FAILED. >>>> AsynchronousException{java. >>>> lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} >>>> at org.apache.flink.streaming. >>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>>> StreamTask.java:970) >>>> at java.util.concurrent. >>>> Executors$RunnableAdapter. >>>> call(Executors.java:511) >>>> at java.util.concurrent. >>>> FutureTask.run(FutureTask. >>>> java:266) >>>> at java.util.concurrent. >>>> ThreadPoolExecutor.runWorker( >>>> ThreadPoolExecutor.java:1149) >>>> at java.util.concurrent. >>>> ThreadPoolExecutor$Worker.run( >>>> ThreadPoolExecutor.java:624) >>>> at java.lang.Thread.run(Thread. >>>> java:748) >>>> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). >>>> ... 6 more >>>> Caused by: java.util.concurrent. >>>> ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry( >>>> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) >>>> at java.util.concurrent. >>>> FutureTask.report(FutureTask. >>>> java:122) >>>> at java.util.concurrent. >>>> FutureTask.get(FutureTask. >>>> java:192) >>>> at org.apache.flink.util. >>>> FutureUtil.runIfNotDoneAndGet( >>>> FutureUtil.java:43) >>>> at org.apache.flink.streaming. >>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>>> StreamTask.java:897) >>>> ... 5 more >>>> Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. >>>> at org.apache.flink.streaming. >>>> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java: >>>> 90) >>>> at org.apache.flink.streaming. >>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable. >>>> cleanup(StreamTask.java:1023) >>>> at org.apache.flink.streaming. >>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>>> StreamTask.java:961) >>>> ... 5 more >>>> >>> >>> >>> <MilestoneEvent.java><example.java> >> > signature.asc (817 bytes) Download Attachment |
Hi Dawid,
After rewriting dashcode with Objects.hash for all the fields, I still get the same error. One thing special is checkpoints always fail at 428, after trying many times. Does it mean anything? > On Aug 10, 2017, at 9:14 AM, Dawid Wysakowicz <[hidden email]> wrote: > > Yes, with the information I have, the conclusion would be the same, that I think the reason is problem with hashcode. Without some data to reproduce it unfortunately I won’t be able to help you further. I could just advise you to debug the method SharedBuffer#serialize and pay attention to the entryID map. > >> On 10 Aug 2017, at 14:54, Daiqing Li <[hidden email]> wrote: >> >> Oh sorry, the data in {} is not empty because I hide private information about my model. Do you have that same conclusion? >>> On Aug 10, 2017, at 8:52 AM, Dawid Wysakowicz <[hidden email]> wrote: >>> >>> You are right, I won’t be able to reproduce this problem without data. One thing I can tell though that I think the problem is indeed with the hashcode. Unforunately I don’t know Gson, but one strange thing I noticed is the exception message: SharedBufferEntry(ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1). The first {} is your event.toString, which seems odd as if your event was empty. >>> >>> Generally speaking as I understand this Exception is thrown because the hashcode of your event changes during serialization, and access to some internal temporary cache is broken. >>> >>>> On 10 Aug 2017, at 14:29, Daiqing Li <[hidden email]> wrote: >>>> >>>> Hi, >>>> >>>> Here is the code. But I am not sure if you can reproduce the problem without data source. >>>> >>>> Best, >>>> Daiqing >>>> >>>> On Thu, Aug 10, 2017 at 8:15 AM, Dawid Wysakowicz <[hidden email]> wrote: >>>> As @Kostas asked in your previous thread would be possible for you to share your code for that job or at least a minimal example to reproduce this behaviour. I fear we won’t be able to help you without any further info. >>>> >>>> Regards, >>>> Dawid >>>> >>>>> On 10 Aug 2017, at 14:10, Daiqing Li <[hidden email]> wrote: >>>>> >>>>> Hi Flink user, >>>>> >>>>> I am using FsStateBackend and AWS EMR YARN to run a CEP job. I got this exception after running for a while. Could anyone give me some help to debug this? I try parallelism 1, and it has the same problem. I also try reimplemented hashcode and equals method. I use UUID as hashcode right now. >>>>> 2017-08-09 18:15:04,572 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4) (d4749a4c3469732a2a5edf40b83f88 >>>>> d4) switched from RUNNING to FAILED. >>>>> AsynchronousException{java. >>>>> lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4).} >>>>> at org.apache.flink.streaming. >>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>>>> StreamTask.java:970) >>>>> at java.util.concurrent. >>>>> Executors$RunnableAdapter. >>>>> call(Executors.java:511) >>>>> at java.util.concurrent. >>>>> FutureTask.run(FutureTask. >>>>> java:266) >>>>> at java.util.concurrent. >>>>> ThreadPoolExecutor.runWorker( >>>>> ThreadPoolExecutor.java:1149) >>>>> at java.util.concurrent. >>>>> ThreadPoolExecutor$Worker.run( >>>>> ThreadPoolExecutor.java:624) >>>>> at java.lang.Thread.run(Thread. >>>>> java:748) >>>>> Caused by: java.lang.Exception: Could not materialize checkpoint 946 for operator KeyedCEPPatternOperator -> Map -> Sink: Unnamed (3/4). >>>>> ... 6 more >>>>> Caused by: java.util.concurrent. >>>>> ExecutionException: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry( >>>>> ValueTimeWrapper({}, 1502298303586, 0), [SharedBufferEdge(null, 1)], 1) >>>>> at java.util.concurrent. >>>>> FutureTask.report(FutureTask. >>>>> java:122) >>>>> at java.util.concurrent. >>>>> FutureTask.get(FutureTask. >>>>> java:192) >>>>> at org.apache.flink.util. >>>>> FutureUtil.runIfNotDoneAndGet( >>>>> FutureUtil.java:43) >>>>> at org.apache.flink.streaming. >>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>>>> StreamTask.java:897) >>>>> ... 5 more >>>>> Suppressed: java.lang.Exception: Could not properly cancel managed keyed state future. >>>>> at org.apache.flink.streaming. >>>>> api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java: >>>>> 90) >>>>> at org.apache.flink.streaming. >>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable. >>>>> cleanup(StreamTask.java:1023) >>>>> at org.apache.flink.streaming. >>>>> runtime.tasks.StreamTask$AsyncCheckpointRunnable.run( >>>>> StreamTask.java:961) >>>>> ... 5 more >>>>> >>>> >>>> >>>> <MilestoneEvent.java><example.java> >>> >> > |
Free forum by Nabble | Edit this page |