Flink job restart at checkpoint interval

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink job restart at checkpoint interval

scgupta
Hi,

I am using Value State, backed by FsStateBackend on hdfs, as following: 

env.setStateBackend(new FsStateBackend(stateBackendPath))
env.enableCheckpointing(checkpointInterval)

It is non-iterative job running Flink/Yarn. The job restarts at checkpointInterval, I have tried interval varying from 30 sec to 10 min. Any idea why it could be restarting.

I see following exception in the log:

======
2016-11-14 09:24:28,787 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Map -> Filter -> cell_users_update (1/1) (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to CANCELING
2016-11-14 09:24:28,788 INFO  org.apache.flink.yarn.YarnJobManager                          - Status of job 03a56958263a688dc34cc8d5069aac8f (Processor) changed to FAILING.
java.lang.RuntimeException: Error triggering a checkpoint as the result of receiving checkpoint barrier
	at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:701)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:691)
	at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
	at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:215)
	at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
	at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException: DataStreamer Exception: 
	at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:200)
	at org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
	at org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:176)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:498)
	at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:695)
	... 8 more
Caused by: java.io.IOException: DataStreamer Exception: 
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:563)
Caused by: java.lang.ExceptionInInitializerError
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1322)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
Caused by: java.lang.RuntimeException: javax.xml.parsers.ParserConfigurationException: Feature 'http://apache.org/xml/features/xinclude' is not recognized.
	at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2648)
	at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
	at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
	at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
	at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031)
	at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251)
	at org.apache.hadoop.hdfs.protocol.HdfsConstants.<clinit>(HdfsConstants.java:76)
	... 3 more
Caused by: javax.xml.parsers.ParserConfigurationException: Feature 'http://apache.org/xml/features/xinclude' is not recognized.
	at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumentBuilder(Unknown Source)
	at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2530)
	... 9 more
2016-11-14 09:24:28,789 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Map -> Filter -> device_status_update (1/1) (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to CANCELING
2016-11-14 09:24:28,789 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Map -> Filter -> Map -> Filter -> cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45) switched from RUNNING to CANCELING
======

Thanks,
+satish
Reply | Threaded
Open this post in threaded view
|

Re: Flink job restart at checkpoint interval

Ufuk Celebi
There seems to be an Exception happening when Flink tries to serialize the state of your operator (see the stack trace).

What are you trying to store via the ValueState? Maybe you can share a code excerpt?

– Ufuk

On 14 November 2016 at 10:51:06, Satish Chandra Gupta ([hidden email]) wrote:

> Hi,
>  
> I am using Value State, backed by FsStateBackend on hdfs, as following:
>  
> env.setStateBackend(new FsStateBackend(stateBackendPath))
> env.enableCheckpointing(checkpointInterval)
>  
>  
> It is non-iterative job running Flink/Yarn. The job restarts at
> checkpointInterval, I have tried interval varying from 30 sec to 10 min.
> Any idea why it could be restarting.
>  
> I see following exception in the log:
>  
> ======
>  
> 2016-11-14 09:24:28,787 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> cell_users_update (1/1)
> (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to CANCELING
> 2016-11-14 09:24:28,788 INFO org.apache.flink.yarn.YarnJobManager
> - Status of job 03a56958263a688dc34cc8d5069aac8f
> (Processor) changed to FAILING.*java.lang.RuntimeException: Error
> triggering a checkpoint as the result of receiving checkpoint barrier*
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:701)  
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:691)  
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)  
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)  
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:215)  
> at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)  
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)  
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.io.IOException: DataStreamer Exception:
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:200)  
> at org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)  
> at org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)  
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:176)  
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)  
> at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:498)  
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:695)  
> ... 8 more
> Caused by: java.io.IOException: DataStreamer Exception:
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:563)  
> Caused by: java.lang.ExceptionInInitializerError
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1322)  
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)  
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)  
> Caused by: java.lang.RuntimeException:
> javax.xml.parsers.ParserConfigurationException: Feature
> 'http://apache.org/xml/features/xinclude' is not recognized.
> at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2648)  
> at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)  
> at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)  
> at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
> at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031)  
> at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251)
> at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76)  
> ... 3 more
> Caused by: javax.xml.parsers.ParserConfigurationException: Feature
> 'http://apache.org/xml/features/xinclude' is not recognized.
> at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumentBuilder(Unknown  
> Source)
> at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2530)  
> ... 9 more
> 2016-11-14 09:24:28,789 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> device_status_update (1/1)
> (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to CANCELING
> 2016-11-14 09:24:28,789 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> Map -> Filter ->
> cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45) switched
> from RUNNING to CANCELING
>  
> ======
>  
> Thanks,
> +satish
>  

Reply | Threaded
Open this post in threaded view
|

Re: Flink job restart at checkpoint interval

scgupta
Most of the ValueState I am using are of Long or Boolean, except one which is a map of Long to Scala case class:
ValueState[Map[Long, AnScalaCaseClass]]

Does this serialization happen only for the value state members of operators, or also other private fields?
Thanks
+satish

On Mon, Nov 14, 2016 at 3:47 PM, Ufuk Celebi <[hidden email]> wrote:
There seems to be an Exception happening when Flink tries to serialize the state of your operator (see the stack trace).

What are you trying to store via the ValueState? Maybe you can share a code excerpt?

– Ufuk

On 14 November 2016 at 10:51:06, Satish Chandra Gupta ([hidden email]) wrote:
> Hi,
>
> I am using Value State, backed by FsStateBackend on hdfs, as following:
>
> env.setStateBackend(new FsStateBackend(stateBackendPath))
> env.enableCheckpointing(checkpointInterval)
>
>
> It is non-iterative job running Flink/Yarn. The job restarts at
> checkpointInterval, I have tried interval varying from 30 sec to 10 min.
> Any idea why it could be restarting.
>
> I see following exception in the log:
>
> ======
>
> 2016-11-14 09:24:28,787 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> cell_users_update (1/1)
> (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to CANCELING
> 2016-11-14 09:24:28,788 INFO org.apache.flink.yarn.YarnJobManager
> - Status of job 03a56958263a688dc34cc8d5069aac8f
> (Processor) changed to FAILING.*java.lang.RuntimeException: Error
> triggering a checkpoint as the result of receiving checkpoint barrier*
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:701)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:691)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:215)
> at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.io.IOException: DataStreamer Exception:
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:200)
> at org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
> at org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:176)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:498)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:695)
> ... 8 more
> Caused by: java.io.IOException: DataStreamer Exception:
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:563)
> Caused by: java.lang.ExceptionInInitializerError
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1322)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
> Caused by: java.lang.RuntimeException:
> javax.xml.parsers.ParserConfigurationException: Feature
> 'http://apache.org/xml/features/xinclude' is not recognized.
> at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2648)
> at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
> at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
> at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
> at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031)
> at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251)
> at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76)
> ... 3 more
> Caused by: javax.xml.parsers.ParserConfigurationException: Feature
> 'http://apache.org/xml/features/xinclude' is not recognized.
> at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumentBuilder(Unknown
> Source)
> at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2530)
> ... 9 more
> 2016-11-14 09:24:28,789 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> device_status_update (1/1)
> (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to CANCELING
> 2016-11-14 09:24:28,789 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> Map -> Filter ->
> cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45) switched
> from RUNNING to CANCELING
>
> ======
>
> Thanks,
> +satish
>


Reply | Threaded
Open this post in threaded view
|

Re: Flink job restart at checkpoint interval

Till Rohrmann
Hi Satish,

your problem seems to be more related to a problem reading Hadoop's configuration. According to the internet [1,2,3] try to select a proper xerces version to resolve the problem.


Cheers,
Till

On Tue, Nov 15, 2016 at 3:24 AM, Satish Chandra Gupta <[hidden email]> wrote:
Most of the ValueState I am using are of Long or Boolean, except one which is a map of Long to Scala case class:
ValueState[Map[Long, AnScalaCaseClass]]

Does this serialization happen only for the value state members of operators, or also other private fields?
Thanks
+satish

On Mon, Nov 14, 2016 at 3:47 PM, Ufuk Celebi <[hidden email]> wrote:
There seems to be an Exception happening when Flink tries to serialize the state of your operator (see the stack trace).

What are you trying to store via the ValueState? Maybe you can share a code excerpt?

– Ufuk

On 14 November 2016 at 10:51:06, Satish Chandra Gupta ([hidden email]) wrote:
> Hi,
>
> I am using Value State, backed by FsStateBackend on hdfs, as following:
>
> env.setStateBackend(new FsStateBackend(stateBackendPath))
> env.enableCheckpointing(checkpointInterval)
>
>
> It is non-iterative job running Flink/Yarn. The job restarts at
> checkpointInterval, I have tried interval varying from 30 sec to 10 min.
> Any idea why it could be restarting.
>
> I see following exception in the log:
>
> ======
>
> 2016-11-14 09:24:28,787 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> cell_users_update (1/1)
> (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to CANCELING
> 2016-11-14 09:24:28,788 INFO org.apache.flink.yarn.YarnJobManager
> - Status of job 03a56958263a688dc34cc8d5069aac8f
> (Processor) changed to FAILING.*java.lang.RuntimeException: Error
> triggering a checkpoint as the result of receiving checkpoint barrier*
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:701)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:691)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:215)
> at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.io.IOException: DataStreamer Exception:
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:200)
> at org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
> at org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:176)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:498)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:695)
> ... 8 more
> Caused by: java.io.IOException: DataStreamer Exception:
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:563)
> Caused by: java.lang.ExceptionInInitializerError
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1322)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
> Caused by: java.lang.RuntimeException:
> javax.xml.parsers.ParserConfigurationException: Feature
> 'http://apache.org/xml/features/xinclude' is not recognized.
> at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2648)
> at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
> at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
> at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
> at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031)
> at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251)
> at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76)
> ... 3 more
> Caused by: javax.xml.parsers.ParserConfigurationException: Feature
> 'http://apache.org/xml/features/xinclude' is not recognized.
> at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumentBuilder(Unknown
> Source)
> at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2530)
> ... 9 more
> 2016-11-14 09:24:28,789 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> device_status_update (1/1)
> (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to CANCELING
> 2016-11-14 09:24:28,789 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> Map -> Filter ->
> cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45) switched
> from RUNNING to CANCELING
>
> ======
>
> Thanks,
> +satish
>



Reply | Threaded
Open this post in threaded view
|

Re: Flink job restart at checkpoint interval

scgupta
Hi Ufuk and Till,

Thanks a lot. Both these suggestions were useful. Older version of xerces was being loaded from one of the dependencies, and I also fixed the serialization glitch in my code, and now checkpointing works.

I have 5 value states apart from a custom trigger, and a custom trigger. Is there anyway I can configure the filename in which these checkpoints are saved. For example in:

<configured-checkpoint-path>/1193cd5ef0c8de256a059e363dfcb26c/chk-20/f1c9cf97-f5fa-44e2-81df-7729cd8226be

Can I specify somewhere that is included in the file id.

The reason I am asking is, one checkpoint file keeps growing and I suspect some bug in my logic that is causing memory leak, and I want to identify which value state is causing this. Any suggestions?

Thanks,
+satish


On Tue, Nov 15, 2016 at 4:01 PM, Till Rohrmann <[hidden email]> wrote:
Hi Satish,

your problem seems to be more related to a problem reading Hadoop's configuration. According to the internet [1,2,3] try to select a proper xerces version to resolve the problem.


Cheers,
Till

On Tue, Nov 15, 2016 at 3:24 AM, Satish Chandra Gupta <[hidden email]> wrote:
Most of the ValueState I am using are of Long or Boolean, except one which is a map of Long to Scala case class:
ValueState[Map[Long, AnScalaCaseClass]]

Does this serialization happen only for the value state members of operators, or also other private fields?
Thanks
+satish

On Mon, Nov 14, 2016 at 3:47 PM, Ufuk Celebi <[hidden email]> wrote:
There seems to be an Exception happening when Flink tries to serialize the state of your operator (see the stack trace).

What are you trying to store via the ValueState? Maybe you can share a code excerpt?

– Ufuk

On 14 November 2016 at 10:51:06, Satish Chandra Gupta ([hidden email]) wrote:
> Hi,
>
> I am using Value State, backed by FsStateBackend on hdfs, as following:
>
> env.setStateBackend(new FsStateBackend(stateBackendPath))
> env.enableCheckpointing(checkpointInterval)
>
>
> It is non-iterative job running Flink/Yarn. The job restarts at
> checkpointInterval, I have tried interval varying from 30 sec to 10 min.
> Any idea why it could be restarting.
>
> I see following exception in the log:
>
> ======
>
> 2016-11-14 09:24:28,787 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> cell_users_update (1/1)
> (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to CANCELING
> 2016-11-14 09:24:28,788 INFO org.apache.flink.yarn.YarnJobManager
> - Status of job 03a56958263a688dc34cc8d5069aac8f
> (Processor) changed to FAILING.*java.lang.RuntimeException: Error
> triggering a checkpoint as the result of receiving checkpoint barrier*
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:701)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:691)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:215)
> at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.io.IOException: DataStreamer Exception:
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:200)
> at org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
> at org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:176)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:498)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:695)
> ... 8 more
> Caused by: java.io.IOException: DataStreamer Exception:
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:563)
> Caused by: java.lang.ExceptionInInitializerError
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1322)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
> Caused by: java.lang.RuntimeException:
> javax.xml.parsers.ParserConfigurationException: Feature
> 'http://apache.org/xml/features/xinclude' is not recognized.
> at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2648)
> at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
> at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
> at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
> at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031)
> at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251)
> at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76)
> ... 3 more
> Caused by: javax.xml.parsers.ParserConfigurationException: Feature
> 'http://apache.org/xml/features/xinclude' is not recognized.
> at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumentBuilder(Unknown
> Source)
> at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2530)
> ... 9 more
> 2016-11-14 09:24:28,789 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> device_status_update (1/1)
> (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to CANCELING
> 2016-11-14 09:24:28,789 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> Map -> Filter ->
> cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45) switched
> from RUNNING to CANCELING
>
> ======
>
> Thanks,
> +satish
>




Reply | Threaded
Open this post in threaded view
|

Re: Flink job restart at checkpoint interval

Till Rohrmann-2
Hi Satish,

I'm afraid but I think there is no such way to configure the name of the checkpoint file for a task at the moment. For the latest checkpoint you can see the state sizes for the individual subtask in the web ui under checkpoints.

Cheers,
Till

On Tue, Nov 15, 2016 at 10:52 PM, Satish Chandra Gupta <[hidden email]> wrote:
Hi Ufuk and Till,

Thanks a lot. Both these suggestions were useful. Older version of xerces was being loaded from one of the dependencies, and I also fixed the serialization glitch in my code, and now checkpointing works.

I have 5 value states apart from a custom trigger, and a custom trigger. Is there anyway I can configure the filename in which these checkpoints are saved. For example in:

<configured-checkpoint-path>/1193cd5ef0c8de256a059e363dfcb26c/chk-20/f1c9cf97-f5fa-44e2-81df-7729cd8226be

Can I specify somewhere that is included in the file id.

The reason I am asking is, one checkpoint file keeps growing and I suspect some bug in my logic that is causing memory leak, and I want to identify which value state is causing this. Any suggestions?

Thanks,
+satish


On Tue, Nov 15, 2016 at 4:01 PM, Till Rohrmann <[hidden email]> wrote:
Hi Satish,

your problem seems to be more related to a problem reading Hadoop's configuration. According to the internet [1,2,3] try to select a proper xerces version to resolve the problem.


Cheers,
Till

On Tue, Nov 15, 2016 at 3:24 AM, Satish Chandra Gupta <[hidden email]> wrote:
Most of the ValueState I am using are of Long or Boolean, except one which is a map of Long to Scala case class:
ValueState[Map[Long, AnScalaCaseClass]]

Does this serialization happen only for the value state members of operators, or also other private fields?
Thanks
+satish

On Mon, Nov 14, 2016 at 3:47 PM, Ufuk Celebi <[hidden email]> wrote:
There seems to be an Exception happening when Flink tries to serialize the state of your operator (see the stack trace).

What are you trying to store via the ValueState? Maybe you can share a code excerpt?

– Ufuk

On 14 November 2016 at 10:51:06, Satish Chandra Gupta ([hidden email]) wrote:
> Hi,
>
> I am using Value State, backed by FsStateBackend on hdfs, as following:
>
> env.setStateBackend(new FsStateBackend(stateBackendPath))
> env.enableCheckpointing(checkpointInterval)
>
>
> It is non-iterative job running Flink/Yarn. The job restarts at
> checkpointInterval, I have tried interval varying from 30 sec to 10 min.
> Any idea why it could be restarting.
>
> I see following exception in the log:
>
> ======
>
> 2016-11-14 09:24:28,787 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> cell_users_update (1/1)
> (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to CANCELING
> 2016-11-14 09:24:28,788 INFO org.apache.flink.yarn.YarnJobManager
> - Status of job 03a56958263a688dc34cc8d5069aac8f
> (Processor) changed to FAILING.*java.lang.RuntimeException: Error
> triggering a checkpoint as the result of receiving checkpoint barrier*
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:701)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:691)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:215)
> at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.io.IOException: DataStreamer Exception:
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:200)
> at org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
> at org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:176)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:498)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:695)
> ... 8 more
> Caused by: java.io.IOException: DataStreamer Exception:
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:563)
> Caused by: java.lang.ExceptionInInitializerError
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1322)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
> Caused by: java.lang.RuntimeException:
> javax.xml.parsers.ParserConfigurationException: Feature
> 'http://apache.org/xml/features/xinclude' is not recognized.
> at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2648)
> at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
> at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
> at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
> at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031)
> at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251)
> at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76)
> ... 3 more
> Caused by: javax.xml.parsers.ParserConfigurationException: Feature
> 'http://apache.org/xml/features/xinclude' is not recognized.
> at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumentBuilder(Unknown
> Source)
> at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2530)
> ... 9 more
> 2016-11-14 09:24:28,789 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> device_status_update (1/1)
> (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to CANCELING
> 2016-11-14 09:24:28,789 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> Map -> Filter ->
> cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45) switched
> from RUNNING to CANCELING
>
> ======
>
> Thanks,
> +satish
>





Reply | Threaded
Open this post in threaded view
|

Re: Flink job restart at checkpoint interval

scgupta
Hi Till,

Thanks. Yes, that is what I have been doing. But accessing GUI over VPN of Flink running on a yarn cluster on EMR sometime becomes very slow (not even execution plan gets shown :-) sometime), that's why I thought of this.

Thanks,
+satish

On Wed, Nov 16, 2016 at 6:46 PM, Till Rohrmann <[hidden email]> wrote:
Hi Satish,

I'm afraid but I think there is no such way to configure the name of the checkpoint file for a task at the moment. For the latest checkpoint you can see the state sizes for the individual subtask in the web ui under checkpoints.

Cheers,
Till

On Tue, Nov 15, 2016 at 10:52 PM, Satish Chandra Gupta <[hidden email]> wrote:
Hi Ufuk and Till,

Thanks a lot. Both these suggestions were useful. Older version of xerces was being loaded from one of the dependencies, and I also fixed the serialization glitch in my code, and now checkpointing works.

I have 5 value states apart from a custom trigger, and a custom trigger. Is there anyway I can configure the filename in which these checkpoints are saved. For example in:

<configured-checkpoint-path>/1193cd5ef0c8de256a059e363dfcb26c/chk-20/f1c9cf97-f5fa-44e2-81df-7729cd8226be

Can I specify somewhere that is included in the file id.

The reason I am asking is, one checkpoint file keeps growing and I suspect some bug in my logic that is causing memory leak, and I want to identify which value state is causing this. Any suggestions?

Thanks,
+satish


On Tue, Nov 15, 2016 at 4:01 PM, Till Rohrmann <[hidden email]> wrote:
Hi Satish,

your problem seems to be more related to a problem reading Hadoop's configuration. According to the internet [1,2,3] try to select a proper xerces version to resolve the problem.


Cheers,
Till

On Tue, Nov 15, 2016 at 3:24 AM, Satish Chandra Gupta <[hidden email]> wrote:
Most of the ValueState I am using are of Long or Boolean, except one which is a map of Long to Scala case class:
ValueState[Map[Long, AnScalaCaseClass]]

Does this serialization happen only for the value state members of operators, or also other private fields?
Thanks
+satish

On Mon, Nov 14, 2016 at 3:47 PM, Ufuk Celebi <[hidden email]> wrote:
There seems to be an Exception happening when Flink tries to serialize the state of your operator (see the stack trace).

What are you trying to store via the ValueState? Maybe you can share a code excerpt?

– Ufuk

On 14 November 2016 at 10:51:06, Satish Chandra Gupta ([hidden email]) wrote:
> Hi,
>
> I am using Value State, backed by FsStateBackend on hdfs, as following:
>
> env.setStateBackend(new FsStateBackend(stateBackendPath))
> env.enableCheckpointing(checkpointInterval)
>
>
> It is non-iterative job running Flink/Yarn. The job restarts at
> checkpointInterval, I have tried interval varying from 30 sec to 10 min.
> Any idea why it could be restarting.
>
> I see following exception in the log:
>
> ======
>
> 2016-11-14 09:24:28,787 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> cell_users_update (1/1)
> (fd72961bedbb0f18bffb5ae66b926313) switched from RUNNING to CANCELING
> 2016-11-14 09:24:28,788 INFO org.apache.flink.yarn.YarnJobManager
> - Status of job 03a56958263a688dc34cc8d5069aac8f
> (Processor) changed to FAILING.*java.lang.RuntimeException: Error
> triggering a checkpoint as the result of receiving checkpoint barrier*
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:701)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:691)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203)
> at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:215)
> at org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask.run(TwoInputStreamTask.java:89)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: com.esotericsoftware.kryo.KryoException:
> java.io.IOException: DataStreamer Exception:
> at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:200)
> at org.apache.flink.runtime.state.filesystem.AbstractFsState.snapshot(AbstractFsState.java:85)
> at org.apache.flink.runtime.state.AbstractStateBackend.snapshotPartitionedState(AbstractStateBackend.java:265)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotOperatorState(AbstractStreamOperator.java:176)
> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotOperatorState(AbstractUdfStreamOperator.java:121)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:498)
> at org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:695)
> ... 8 more
> Caused by: java.io.IOException: DataStreamer Exception:
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:563)
> Caused by: java.lang.ExceptionInInitializerError
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1322)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1266)
> at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)
> Caused by: java.lang.RuntimeException:
> javax.xml.parsers.ParserConfigurationException: Feature
> 'http://apache.org/xml/features/xinclude' is not recognized.
> at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2648)
> at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2492)
> at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2405)
> at org.apache.hadoop.conf.Configuration.get(Configuration.java:981)
> at org.apache.hadoop.conf.Configuration.getTrimmed(Configuration.java:1031)
> at org.apache.hadoop.conf.Configuration.getInt(Configuration.java:1251)
> at org.apache.hadoop.hdfs.protocol.HdfsConstants.(HdfsConstants.java:76)
> ... 3 more
> Caused by: javax.xml.parsers.ParserConfigurationException: Feature
> 'http://apache.org/xml/features/xinclude' is not recognized.
> at org.apache.xerces.jaxp.DocumentBuilderFactoryImpl.newDocumentBuilder(Unknown
> Source)
> at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2530)
> ... 9 more
> 2016-11-14 09:24:28,789 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> device_status_update (1/1)
> (9fe20e7a4336b3960b88febc89135d97) switched from RUNNING to CANCELING
> 2016-11-14 09:24:28,789 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> Source: Custom Source -> Map -> Filter -> Map -> Filter ->
> cab_position_update (1/1) (91ea224efa3ba7d130405fbd247f4a45) switched
> from RUNNING to CANCELING
>
> ======
>
> Thanks,
> +satish
>