|
Thank you, Stefan. Any ideas on when can we expect 1.6.3 release?
Sure, it is already merged as FLINK-10816.
Best, Stefan
Thanks a lot for looking into this issue Stefan.
Could you please let me know the issue ID once you open it? It'll help me understand the problem better, and also I could do a quick test in our environment once the issue is resolved.
Thanks, Shailesh Really good finding Stefan!
Hi,
I think I can already spot the problem: LockableTypeSerializer.duplicate() is not properly implemented because it also has to call duplicate() on the element serialiser that is passed into the constructor of the new instance. I will open an issue and fix the problem.
Best, Stefan
Hi Shailesh,
could you maybe provide us with an example program which is able to reproduce this problem? This would help the community to better debug the problem. It looks not right and might point towards a bug in Flink. Thanks a lot!
Cheers, Till On Tue, Oct 30, 2018 at 9:10 AM Dawid Wysakowicz < [hidden email]> wrote:
This is some problem with serializing your events using Kryo. I'm
adding Gordon to cc, as he was recently working with serializers.
He might give you more insights what is going wrong. Best, Dawid
On 25/10/2018 05:41, Shailesh Jain
wrote:
Hi Dawid,
I ran two separate identical jobs (with and without
checkpointing enabled), I'm hitting a
ArrayIndexOutOfBoundsException (and sometimes NPE) only
when checkpointing (HDFS backend) is enabled,
with the below stack trace.
Any ideas on what could be causing this?
Thanks,
Shailesh
2018-10-24 17:04:13,365 INFO
org.apache.flink.runtime.taskmanager.Task
- SelectCepOperatorMixedTime (1/1) -
SelectCepOperatorMixedTime (1/1)
(3d984b7919342a3886593401088ca2cd) switched from
RUNNING to FAILED.
org.apache.flink.util.FlinkRuntimeException: Failure
happened in filter function.
at
org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:731)
at
org.apache.flink.cep.nfa.NFA.computeNextStates(NFA.java:541)
at
org.apache.flink.cep.nfa.NFA.doProcess(NFA.java:284)
at
org.apache.flink.cep.nfa.NFA.process(NFA.java:220)
at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.processEvent(AbstractKeyedCEPPatternOperator.java:379)
at
org.apache.flink.cep.operator.AbstractKeyedCEPPatternMixedTimeApproachOperator.processElement(AbstractKeyedCEPPatternMixedTimeApproachOperator.java:45)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.util.WrappingRuntimeException:
java.lang.ArrayIndexOutOfBoundsException: -1
at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:305)
at
java.util.HashMap.computeIfAbsent(HashMap.java:1127)
at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:301)
at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.materializeMatch(SharedBuffer.java:291)
at
org.apache.flink.cep.nfa.NFA$ConditionContext.getEventsForPattern(NFA.java:811)
at
com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:70)
at
com.stellapps.contrakcep.flink.patterns.critical.AgitatorMalfunctionAfterChillingPattern$1.filter(AgitatorMalfunctionAfterChillingPattern.java:62)
at
org.apache.flink.cep.nfa.NFA.checkFilterCondition(NFA.java:742)
at
org.apache.flink.cep.nfa.NFA.createDecisionGraph(NFA.java:716)
... 10 more
Caused by: java.lang.ArrayIndexOutOfBoundsException:
-1
at
com.esotericsoftware.kryo.util.IntArray.add(IntArray.java:61)
at
com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:800)
at
com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:655)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:231)
at
org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:120)
at
org.apache.flink.cep.nfa.sharedbuffer.Lockable$LockableTypeSerializer.copy(Lockable.java:95)
at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:113)
at
org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
at
org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
at
org.apache.flink.runtime.state.UserFacingMapState.get(UserFacingMapState.java:47)
at
org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer.lambda$materializeMatch$1(SharedBuffer.java:303)
... 18 more
On Fri, Sep 28, 2018 at 11:00 AM
Shailesh Jain < [hidden email]>
wrote:
Hi Dawid,
Thanks for your time on this. The diff should
have pointed out only the top 3 commits, but
since it did not, it is possible I did not
rebase my branch against 1.4.2 correctly. I'll
check this out and get back to you if I hit the
same issue again.
Thanks again,
Shailesh
On Thu, Sep 27, 2018 at 1:00 PM
Dawid Wysakowicz < [hidden email]>
wrote:
Hi Shailesh, I am afraid it is gonna be hard to help
you, as this branch differs significantly
from 1.4.2 release (I've done diff across
your branch and tag/release-1.4.2). Moreover
the code in the branch you've provided still
does not correspond to the lines in the
exception you've posted previously. Could
you check if the problem occurs on vanilla
flink as well? Best, Dawid
On
27/09/18 08:22, Shailesh Jain wrote:
Hi Dawid,
Yes, it is version 1.4.2. We are
running vanilla flink, but have added
a couple of changes in the CEP
operator specifically (top 3 commits
here: https://github.com/jainshailesh/flink/commits/poc_on_1.4.2).
Changes I've made to CEP operators do
not touch the checkpointing path, just
overloading the operator for a
specific way of handling event time.
We are hitting this in production,
so I'm not sure it'll be feasible to
move to 1.6.0 immediately, but
eventually yes.
Thanks,
Shailesh
On Wed, Sep 26, 2018
at 5:44 PM Dawid Wysakowicz < [hidden email]>
wrote:
Hi Shailesh, Are you sure you are using
version 1.4.2? Do you run a
vanilla flink, or have you
introduced some changes? I am
asking cause the lines in
stacktrace does not align with
the source code for 1.4.2. Also it is a different
exception than the one in the
issue you've linked, so if it
is a problem than it is
definitely a different one.
Last thing I would recommend
upgrading to the newest
version, as we rewritten the
SharedBuffer implementation in
1.6.0.
Best, Dawid
On
26/09/18 13:50, Shailesh Jain
wrote:
Hi,
I think I've hit
this same issue on a 3
node standalone
cluster (1.4.2) using
HDFS (2.8.4) as state
backend.
2018-09-26
17:07:39,370 INFO
org.apache.flink.runtime.taskmanager.Task
- Attempting to fail
task externally
SelectCepOperator
(1/1)
(3bec4aa1ef2226c4e0c5ff7b3860d340).
2018-09-26
17:07:39,370 INFO
org.apache.flink.runtime.taskmanager.Task
- SelectCepOperator
(1/1)
(3bec4aa1ef2226c4e0c5ff7b3860d340)
switched from RUNNING
to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize
checkpoint 6 for
operator
SelectCepOperator
(1/1).}
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:948)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at
java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at
java.lang.Thread.run(Thread.java:748)
Caused by:
java.lang.Exception:
Could not materialize
checkpoint 6 for
operator
SelectCepOperator
(1/1).
... 6 more
Caused by:
java.util.concurrent.ExecutionException:
java.lang.NullPointerException
at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
at
java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
... 5 more
Suppressed:
java.lang.Exception:
Could not properly
cancel managed keyed
state future.
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:91)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:976)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:939)
... 5 more
Caused by:
java.util.concurrent.ExecutionException:
java.lang.NullPointerException
at
java.util.concurrent.FutureTask.report(FutureTask.java:122)
at
java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:66)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:89)
... 7 more
Caused by:
java.lang.NullPointerException
at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:954)
at
org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:825)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:888)
at
org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:820)
at
org.apache.flink.runtime.state.heap.CopyOnWriteStateTableSnapshot.writeMappingsInKeyGroup(CopyOnWriteStateTableSnapshot.java:196)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:390)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:339)
at
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
at
java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:894)
... 5 more
[CIRCULAR
REFERENCE:java.lang.NullPointerException]
On Sat, Nov
4, 2017 at 12:34 AM
Federico D'Ambrosio < [hidden email]>
wrote:
Thank you very
much for your steady
response, Kostas!
Cheers,
Federico
|