FsStateBackend with incremental backup enable does not work with Keyed CEP

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

FsStateBackend with incremental backup enable does not work with Keyed CEP

Daiqing Li
Hi,

I am running fling 1.3.1 on EMR. But I am getting this exception after running for a while.


java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Could not copy NFA. at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908) at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) ... 7 more Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620) at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719) at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903) ... 17 more
Reply | Threaded
Open this post in threaded view
|

Re: FsStateBackend with incremental backup enable does not work with Keyed CEP

Stefan Richter
Hi,

from a quick look, I would say this is likely a problem with the NFASerializer: this class seems to be stateful, but its 'duplicate()‘ method is simply returning ‚this‘. This means that code which relies on duplication of serializers to shield against concurrent accesses can break, because multiple threads can work on the same internal serializer state and corrupt it. Will take a deeper look an monday.

Best,
Stefan

Am 11.08.2017 um 20:55 schrieb Daiqing Li <[hidden email]>:

Hi,

I am running fling 1.3.1 on EMR. But I am getting this exception after running for a while.


java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Could not copy NFA. at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908) at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) ... 7 more Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620) at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719) at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903) ... 17 more

Reply | Threaded
Open this post in threaded view
|

Re: FsStateBackend with incremental backup enable does not work with Keyed CEP

Kostas Kloudas
Hi Daiqing,

I think Stefan is right and this will be fixed in the upcoming release.
Could you open a JIRA for it with the Exception that you posted here?

Thanks,
Kostas

On Aug 12, 2017, at 10:05 AM, Stefan Richter <[hidden email]> wrote:

Hi,

from a quick look, I would say this is likely a problem with the NFASerializer: this class seems to be stateful, but its 'duplicate()‘ method is simply returning ‚this‘. This means that code which relies on duplication of serializers to shield against concurrent accesses can break, because multiple threads can work on the same internal serializer state and corrupt it. Will take a deeper look an monday.

Best,
Stefan

Am 11.08.2017 um 20:55 schrieb Daiqing Li <[hidden email]>:

Hi,

I am running fling 1.3.1 on EMR. But I am getting this exception after running for a while.


java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Could not copy NFA. at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908) at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) ... 7 more Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620) at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719) at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903) ... 17 more


Reply | Threaded
Open this post in threaded view
|

Re: FsStateBackend with incremental backup enable does not work with Keyed CEP

Daiqing Li
Hi,

Thank you for your reply! I will post it.
On Aug 12, 2017, at 4:20 AM, Kostas Kloudas <[hidden email]> wrote:

Hi Daiqing,

I think Stefan is right and this will be fixed in the upcoming release.
Could you open a JIRA for it with the Exception that you posted here?

Thanks,
Kostas

On Aug 12, 2017, at 10:05 AM, Stefan Richter <[hidden email]> wrote:

Hi,

from a quick look, I would say this is likely a problem with the NFASerializer: this class seems to be stateful, but its 'duplicate()‘ method is simply returning ‚this‘. This means that code which relies on duplication of serializers to shield against concurrent accesses can break, because multiple threads can work on the same internal serializer state and corrupt it. Will take a deeper look an monday.

Best,
Stefan

Am 11.08.2017 um 20:55 schrieb Daiqing Li <[hidden email]>:

Hi,

I am running fling 1.3.1 on EMR. But I am getting this exception after running for a while.


java.lang.RuntimeException: Exception occurred while processing valve output watermark: at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Could not copy NFA. at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908) at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279) at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296) at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268) at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230) at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) ... 7 more Caused by: java.io.StreamCorruptedException: invalid type code: 00 at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620) at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719) at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169) at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957) at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903) ... 17 more