Flink CEP exception during RocksDB update

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink CEP exception during RocksDB update

Varun Dhore


Hello Flink community,

 

I have encountered following exception while testing 1.4.0 release. This error is occurring intermittently and my CEP job keeps restarting after this exception. I am running the job with Event time semantics and checkpoints enabled.

 

 

            java.lang.RuntimeException: Exception occurred while processing valve output watermark:

                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)

                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)

                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)

                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)

                        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)

                        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)

                        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)

                        at java.lang.Thread.run(Thread.java:745)

            Caused by: java.lang.RuntimeException: Error while adding data to RocksDB

                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)

                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)

                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)

                        at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)

                        at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)

                        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)

                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)

                        ... 7 more

            Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, timestamp: 1515593398897), 1515593398897, 0), [SharedBufferEdge(null, 1)], 2)

                        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)

                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972)

                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839)

                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919)

                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839)

                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)

                        ... 13 more

 

 

Thanks,

Varun

 

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP exception during RocksDB update

Kostas Kloudas
Hi Varun,

This can be related to this issue: https://issues.apache.org/jira/browse/FLINK-8226
which is currently fixed on the master.

Could you please try the current master to see if the error persists?

Thanks,
Kostas

On Jan 15, 2018, at 4:09 PM, Varun Dhore <[hidden email]> wrote:



Hello Flink community,
 
I have encountered following exception while testing 1.4.0 release. This error is occurring intermittently and my CEP job keeps restarting after this exception. I am running the job with Event time semantics and checkpoints enabled.
 
 
            java.lang.RuntimeException: Exception occurred while processing valve output watermark:
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
                        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
                        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
                        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                        at java.lang.Thread.run(Thread.java:745)
            Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)
                        at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
                        at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
                        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
                        ... 7 more
            Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, timestamp: 1515593398897), 1515593398897, 0), [SharedBufferEdge(null, 1)], 2)
                        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839)
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
                        ... 13 more
 
 
Thanks,
Varun

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP exception during RocksDB update

Varun Dhore
Thank you Kostas. Since this error is not easily reproducible on my end I’ll continue testing this and confirm the resolution once I am able to do so.

Thanks,
Varun 

Sent from my iPhone

On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <[hidden email]> wrote:

Hi Varun,

This can be related to this issue: https://issues.apache.org/jira/browse/FLINK-8226
which is currently fixed on the master.

Could you please try the current master to see if the error persists?

Thanks,
Kostas

On Jan 15, 2018, at 4:09 PM, Varun Dhore <[hidden email]> wrote:



Hello Flink community,
 
I have encountered following exception while testing 1.4.0 release. This error is occurring intermittently and my CEP job keeps restarting after this exception. I am running the job with Event time semantics and checkpoints enabled.
 
 
            java.lang.RuntimeException: Exception occurred while processing valve output watermark:
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
                        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
                        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
                        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                        at java.lang.Thread.run(Thread.java:745)
            Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)
                        at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
                        at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
                        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
                        ... 7 more
            Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, timestamp: 1515593398897), 1515593398897, 0), [SharedBufferEdge(null, 1)], 2)
                        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839)
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
                        ... 13 more
 
 
Thanks,
Varun

Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP exception during RocksDB update

Kostas Kloudas
Thanks a lot Varun!

Kostas

On Jan 17, 2018, at 9:59 PM, Varun Dhore <[hidden email]> wrote:

Thank you Kostas. Since this error is not easily reproducible on my end I’ll continue testing this and confirm the resolution once I am able to do so.

Thanks,
Varun 

Sent from my iPhone

On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <[hidden email]> wrote:

Hi Varun,

This can be related to this issue: https://issues.apache.org/jira/browse/FLINK-8226
which is currently fixed on the master.

Could you please try the current master to see if the error persists?

Thanks,
Kostas

On Jan 15, 2018, at 4:09 PM, Varun Dhore <[hidden email]> wrote:



Hello Flink community,
 
I have encountered following exception while testing 1.4.0 release. This error is occurring intermittently and my CEP job keeps restarting after this exception. I am running the job with Event time semantics and checkpoints enabled.
 
 
            java.lang.RuntimeException: Exception occurred while processing valve output watermark:
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
                        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
                        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
                        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                        at java.lang.Thread.run(Thread.java:745)
            Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)
                        at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
                        at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
                        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
                        ... 7 more
            Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, timestamp: 1515593398897), 1515593398897, 0), [SharedBufferEdge(null, 1)], 2)
                        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839)
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
                        ... 13 more
 
 
Thanks,
Varun


Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP exception during RocksDB update

Varun Dhore
Hi Kostas,

I was able to reproduce the error with 1.4.0. After upgrading the cluster to 1.5 snapshot and running through the same data I am still experiencing the same exception. CEP patterns that I am running are using followed by patterns e.g AfBfC. From my experience I was never able to get stable execution when checkpoints are enabled. When I disable checkpoints CEP jobs are running fine. Aside from this particular error I also notice that majority of checkpoints expire as the do not complete within configured 5 min timeout period. Any suggestions on further debugging runtime checkpoints would be very helpful. 
Thanks in advance for your assistance.

Regards,
Varun 

On Jan 18, 2018, at 8:11 AM, Kostas Kloudas <[hidden email]> wrote:

Thanks a lot Varun!

Kostas

On Jan 17, 2018, at 9:59 PM, Varun Dhore <[hidden email]> wrote:

Thank you Kostas. Since this error is not easily reproducible on my end I’ll continue testing this and confirm the resolution once I am able to do so.

Thanks,
Varun 

Sent from my iPhone

On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <[hidden email]> wrote:

Hi Varun,

This can be related to this issue: https://issues.apache.org/jira/browse/FLINK-8226
which is currently fixed on the master.

Could you please try the current master to see if the error persists?

Thanks,
Kostas

On Jan 15, 2018, at 4:09 PM, Varun Dhore <[hidden email]> wrote:



Hello Flink community,
 
I have encountered following exception while testing 1.4.0 release. This error is occurring intermittently and my CEP job keeps restarting after this exception. I am running the job with Event time semantics and checkpoints enabled.
 
 
            java.lang.RuntimeException: Exception occurred while processing valve output watermark:
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
                        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
                        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
                        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                        at java.lang.Thread.run(Thread.java:745)
            Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)
                        at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
                        at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
                        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
                        ... 7 more
            Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, timestamp: 1515593398897), 1515593398897, 0), [SharedBufferEdge(null, 1)], 2)
                        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839)
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
                        ... 13 more
 
 
Thanks,
Varun


Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP exception during RocksDB update

Kostas Kloudas
Hi Varun,

Thanks for taking time to look into it. Could you give a sample input and your pattern to reproduce the problem?
That would help a lot at figuring out the cause of the problem.

Thanks,
Kostas

On Jan 23, 2018, at 5:40 PM, Varun Dhore <[hidden email]> wrote:

Hi Kostas,

I was able to reproduce the error with 1.4.0. After upgrading the cluster to 1.5 snapshot and running through the same data I am still experiencing the same exception. CEP patterns that I am running are using followed by patterns e.g AfBfC. From my experience I was never able to get stable execution when checkpoints are enabled. When I disable checkpoints CEP jobs are running fine. Aside from this particular error I also notice that majority of checkpoints expire as the do not complete within configured 5 min timeout period. Any suggestions on further debugging runtime checkpoints would be very helpful. 
Thanks in advance for your assistance.

Regards,
Varun 

On Jan 18, 2018, at 8:11 AM, Kostas Kloudas <[hidden email]> wrote:

Thanks a lot Varun!

Kostas

On Jan 17, 2018, at 9:59 PM, Varun Dhore <[hidden email]> wrote:

Thank you Kostas. Since this error is not easily reproducible on my end I’ll continue testing this and confirm the resolution once I am able to do so.

Thanks,
Varun 

Sent from my iPhone

On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <[hidden email]> wrote:

Hi Varun,

This can be related to this issue: https://issues.apache.org/jira/browse/FLINK-8226
which is currently fixed on the master.

Could you please try the current master to see if the error persists?

Thanks,
Kostas

On Jan 15, 2018, at 4:09 PM, Varun Dhore <[hidden email]> wrote:



Hello Flink community,
 
I have encountered following exception while testing 1.4.0 release. This error is occurring intermittently and my CEP job keeps restarting after this exception. I am running the job with Event time semantics and checkpoints enabled.
 
 
            java.lang.RuntimeException: Exception occurred while processing valve output watermark:
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
                        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
                        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
                        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                        at java.lang.Thread.run(Thread.java:745)
            Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)
                        at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
                        at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
                        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
                        ... 7 more
            Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, timestamp: 1515593398897), 1515593398897, 0), [SharedBufferEdge(null, 1)], 2)
                        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839)
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
                        ... 13 more
 
 
Thanks,
Varun



Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP exception during RocksDB update

Kostas Kloudas
Hi again Varun,

I am investigating the problem you mentioned and I found a bug in the SharedBuffer, 
but I am not sure if it is the only bug that affects you.

Could you please try this branch https://github.com/kl0u/flink/tree/cep-inv and let me know
if the problem is still there?

In addition, are you using Scala with case classes or Java?

Thanks for helping fix the problem,
Kostas

On Jan 24, 2018, at 5:54 PM, Kostas Kloudas <[hidden email]> wrote:

Hi Varun,

Thanks for taking time to look into it. Could you give a sample input and your pattern to reproduce the problem?
That would help a lot at figuring out the cause of the problem.

Thanks,
Kostas

On Jan 23, 2018, at 5:40 PM, Varun Dhore <[hidden email]> wrote:

Hi Kostas,

I was able to reproduce the error with 1.4.0. After upgrading the cluster to 1.5 snapshot and running through the same data I am still experiencing the same exception. CEP patterns that I am running are using followed by patterns e.g AfBfC. From my experience I was never able to get stable execution when checkpoints are enabled. When I disable checkpoints CEP jobs are running fine. Aside from this particular error I also notice that majority of checkpoints expire as the do not complete within configured 5 min timeout period. Any suggestions on further debugging runtime checkpoints would be very helpful. 
Thanks in advance for your assistance.

Regards,
Varun 

On Jan 18, 2018, at 8:11 AM, Kostas Kloudas <[hidden email]> wrote:

Thanks a lot Varun!

Kostas

On Jan 17, 2018, at 9:59 PM, Varun Dhore <[hidden email]> wrote:

Thank you Kostas. Since this error is not easily reproducible on my end I’ll continue testing this and confirm the resolution once I am able to do so.

Thanks,
Varun 

Sent from my iPhone

On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <[hidden email]> wrote:

Hi Varun,

This can be related to this issue: https://issues.apache.org/jira/browse/FLINK-8226
which is currently fixed on the master.

Could you please try the current master to see if the error persists?

Thanks,
Kostas

On Jan 15, 2018, at 4:09 PM, Varun Dhore <[hidden email]> wrote:



Hello Flink community,
 
I have encountered following exception while testing 1.4.0 release. This error is occurring intermittently and my CEP job keeps restarting after this exception. I am running the job with Event time semantics and checkpoints enabled.
 
 
            java.lang.RuntimeException: Exception occurred while processing valve output watermark:
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
                        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
                        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
                        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                        at java.lang.Thread.run(Thread.java:745)
            Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)
                        at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
                        at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
                        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
                        ... 7 more
            Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, timestamp: 1515593398897), 1515593398897, 0), [SharedBufferEdge(null, 1)], 2)
                        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839)
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
                        ... 13 more
 
 
Thanks,
Varun




Reply | Threaded
Open this post in threaded view
|

Re: Flink CEP exception during RocksDB update

Kostas Kloudas
Hi Varun,

The branch I previously sent you has been now merged to the master.
So could you try the master and tell us if you see any change in the behavior? 
Has the problem been fixed, or has the message of the exception changed?

Thanks, 
Kostas

On Jan 29, 2018, at 10:09 AM, Kostas Kloudas <[hidden email]> wrote:

Hi again Varun,

I am investigating the problem you mentioned and I found a bug in the SharedBuffer, 
but I am not sure if it is the only bug that affects you.

Could you please try this branch https://github.com/kl0u/flink/tree/cep-inv and let me know
if the problem is still there?

In addition, are you using Scala with case classes or Java?

Thanks for helping fix the problem,
Kostas

On Jan 24, 2018, at 5:54 PM, Kostas Kloudas <[hidden email]> wrote:

Hi Varun,

Thanks for taking time to look into it. Could you give a sample input and your pattern to reproduce the problem?
That would help a lot at figuring out the cause of the problem.

Thanks,
Kostas

On Jan 23, 2018, at 5:40 PM, Varun Dhore <[hidden email]> wrote:

Hi Kostas,

I was able to reproduce the error with 1.4.0. After upgrading the cluster to 1.5 snapshot and running through the same data I am still experiencing the same exception. CEP patterns that I am running are using followed by patterns e.g AfBfC. From my experience I was never able to get stable execution when checkpoints are enabled. When I disable checkpoints CEP jobs are running fine. Aside from this particular error I also notice that majority of checkpoints expire as the do not complete within configured 5 min timeout period. Any suggestions on further debugging runtime checkpoints would be very helpful. 
Thanks in advance for your assistance.

Regards,
Varun 

On Jan 18, 2018, at 8:11 AM, Kostas Kloudas <[hidden email]> wrote:

Thanks a lot Varun!

Kostas

On Jan 17, 2018, at 9:59 PM, Varun Dhore <[hidden email]> wrote:

Thank you Kostas. Since this error is not easily reproducible on my end I’ll continue testing this and confirm the resolution once I am able to do so.

Thanks,
Varun 

Sent from my iPhone

On Jan 15, 2018, at 10:21 AM, Kostas Kloudas <[hidden email]> wrote:

Hi Varun,

This can be related to this issue: https://issues.apache.org/jira/browse/FLINK-8226
which is currently fixed on the master.

Could you please try the current master to see if the error persists?

Thanks,
Kostas

On Jan 15, 2018, at 4:09 PM, Varun Dhore <[hidden email]> wrote:



Hello Flink community,
 
I have encountered following exception while testing 1.4.0 release. This error is occurring intermittently and my CEP job keeps restarting after this exception. I am running the job with Event time semantics and checkpoints enabled.
 
 
            java.lang.RuntimeException: Exception occurred while processing valve output watermark:
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
                        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
                        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
                        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
                        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
                        at java.lang.Thread.run(Thread.java:745)
            Caused by: java.lang.RuntimeException: Error while adding data to RocksDB
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:102)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.updateNFA(AbstractKeyedCEPPatternOperator.java:276)
                        at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:248)
                        at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
                        at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
                        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
                        at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
                        ... 7 more
            Caused by: java.lang.IllegalStateException: Could not find id for entry: SharedBufferEntry(ValueTimeWrapper(Event(id: 1,name: ”e1”, timestamp: 1515593398897), 1515593398897, 0), [SharedBufferEdge(null, 1)], 2)
                        at org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:972)
                        at org.apache.flink.cep.nfa.SharedBuffer$SharedBufferSerializer.serialize(SharedBuffer.java:839)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:919)
                        at org.apache.flink.cep.nfa.NFA$NFASerializer.serialize(NFA.java:839)
                        at org.apache.flink.contrib.streaming.state.RocksDBValueState.update(RocksDBValueState.java:99)
                        ... 13 more
 
 
Thanks,
Varun