Checkpoint and restore states

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpoint and restore states

Jack Huang
Hi all,

I am doing a simple word count example and want to checkpoint the accumulated word counts. I am not having any luck getting the counts saved and restored. Can someone help?

env.enableCheckpointing(1000)
env.setStateBackend(new MemoryStateBackend())

 ...
 
inStream
    .keyBy({s => s})
    .mapWithState((in:String, count:Option[Int]) => {
        val newCount = count.getOrElse(0) + 1
        ((in, newCount), Some(newCount))
    })

    .print()


Thanks,

Jack Huang
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint and restore states

Aljoscha Krettek
Hi,
what seems to be the problem?

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 03:52 Jack Huang <[hidden email]> wrote:
Hi all,

I am doing a simple word count example and want to checkpoint the accumulated word counts. I am not having any luck getting the counts saved and restored. Can someone help?

env.enableCheckpointing(1000)
env.setStateBackend(new MemoryStateBackend())

 ...
 
inStream
    .keyBy({s => s})
    .mapWithState((in:String, count:Option[Int]) => {
        val newCount = count.getOrElse(0) + 1
        ((in, newCount), Some(newCount))
    })

    .print()


Thanks,

Jack Huang
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint and restore states

stefanobaghino
Hi Jack,

it seems you correctly enabled the checkpointing by calling `env.enableCheckpointing`. However, your UDFs have to either implement the Checkpointed interface or use the Key/Value State interface to make sure the state of the computation is snapshotted.

The documentation explains how to define your functions so that they checkpoint the state far better than I could in this post: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html

I hope I've been of some help, I'll gladly help you further if you need it.

On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
what seems to be the problem?

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 03:52 Jack Huang <[hidden email]> wrote:
Hi all,

I am doing a simple word count example and want to checkpoint the accumulated word counts. I am not having any luck getting the counts saved and restored. Can someone help?

env.enableCheckpointing(1000)
env.setStateBackend(new MemoryStateBackend())

 ...
 
inStream
    .keyBy({s => s})
    .mapWithState((in:String, count:Option[Int]) => {
        val newCount = count.getOrElse(0) + 1
        ((in, newCount), Some(newCount))
    })

    .print()


Thanks,

Jack Huang



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint and restore states

Aljoscha Krettek
Hi,
the *withState() family of functions use the Key/Value state interface internally, so that should work.

On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <[hidden email]> wrote:
Hi Jack,

it seems you correctly enabled the checkpointing by calling `env.enableCheckpointing`. However, your UDFs have to either implement the Checkpointed interface or use the Key/Value State interface to make sure the state of the computation is snapshotted.

The documentation explains how to define your functions so that they checkpoint the state far better than I could in this post: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html

I hope I've been of some help, I'll gladly help you further if you need it.

On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
what seems to be the problem?

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 03:52 Jack Huang <[hidden email]> wrote:
Hi all,

I am doing a simple word count example and want to checkpoint the accumulated word counts. I am not having any luck getting the counts saved and restored. Can someone help?

env.enableCheckpointing(1000)
env.setStateBackend(new MemoryStateBackend())

 ...
 
inStream
    .keyBy({s => s})
    .mapWithState((in:String, count:Option[Int]) => {
        val newCount = count.getOrElse(0) + 1
        ((in, newCount), Some(newCount))
    })

    .print()


Thanks,

Jack Huang



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint and restore states

stefanobaghino
My bad, thanks for pointing that out.

On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the *withState() family of functions use the Key/Value state interface internally, so that should work.

On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <[hidden email]> wrote:
Hi Jack,

it seems you correctly enabled the checkpointing by calling `env.enableCheckpointing`. However, your UDFs have to either implement the Checkpointed interface or use the Key/Value State interface to make sure the state of the computation is snapshotted.

The documentation explains how to define your functions so that they checkpoint the state far better than I could in this post: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html

I hope I've been of some help, I'll gladly help you further if you need it.

On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
what seems to be the problem?

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 03:52 Jack Huang <[hidden email]> wrote:
Hi all,

I am doing a simple word count example and want to checkpoint the accumulated word counts. I am not having any luck getting the counts saved and restored. Can someone help?

env.enableCheckpointing(1000)
env.setStateBackend(new MemoryStateBackend())

 ...
 
inStream
    .keyBy({s => s})
    .mapWithState((in:String, count:Option[Int]) => {
        val newCount = count.getOrElse(0) + 1
        ((in, newCount), Some(newCount))
    })

    .print()


Thanks,

Jack Huang



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint and restore states

Jack Huang
@Aljoscha:
For this word count example I am using a kafka topic as the input stream. The problem is that when I cancel the task and restart it, the task loses the accumulated word counts so far and start counting from 1 again. Am I missing something basic here?

@Stefano:
I also tried to implements the Checkpointed interface but had no luck either. Canceling and restarting the task did not restore the states. Here is my class:

inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
  .keyBy({s => s})
  .map(new StatefulCounter)

class StatefulCounter extends RichMapFunction[String, (String,Int)] with Checkpointed[Integer] {
  private var count: Integer = 0
  
  def map(in: String): (String,Int) = {
    count += 1
    return (in, count)
  }
  def snapshotState(l: Long, l1: Long): Integer = {
    count
  }
  def restoreState(state: Integer) {
    count = state
  }
}


Thanks,


Jack Huang

On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <[hidden email]> wrote:
My bad, thanks for pointing that out.

On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the *withState() family of functions use the Key/Value state interface internally, so that should work.

On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <[hidden email]> wrote:
Hi Jack,

it seems you correctly enabled the checkpointing by calling `env.enableCheckpointing`. However, your UDFs have to either implement the Checkpointed interface or use the Key/Value State interface to make sure the state of the computation is snapshotted.

The documentation explains how to define your functions so that they checkpoint the state far better than I could in this post: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html

I hope I've been of some help, I'll gladly help you further if you need it.

On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
what seems to be the problem?

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 03:52 Jack Huang <[hidden email]> wrote:
Hi all,

I am doing a simple word count example and want to checkpoint the accumulated word counts. I am not having any luck getting the counts saved and restored. Can someone help?

env.enableCheckpointing(1000)
env.setStateBackend(new MemoryStateBackend())

 ...
 
inStream
    .keyBy({s => s})
    .mapWithState((in:String, count:Option[Int]) => {
        val newCount = count.getOrElse(0) + 1
        ((in, newCount), Some(newCount))
    })

    .print()


Thanks,

Jack Huang



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit

Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint and restore states

stefanobaghino
Hello again,

thanks for giving a shot at my advice anyway but Aljoscha is far more knowledgeable then me regarding Flink. :) 

I hope I'm not getting mixed up again but I think gracefully canceling your job means you lose your job state. Am I right in saying that the state is preserved in case of abnormal termination (e.g.: the JobManager crashes) or if you explicitly create a savepoint?

On Wed, Apr 20, 2016 at 10:13 PM, Jack Huang <[hidden email]> wrote:
@Aljoscha:
For this word count example I am using a kafka topic as the input stream. The problem is that when I cancel the task and restart it, the task loses the accumulated word counts so far and start counting from 1 again. Am I missing something basic here?

@Stefano:
I also tried to implements the Checkpointed interface but had no luck either. Canceling and restarting the task did not restore the states. Here is my class:

inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
  .keyBy({s => s})
  .map(new StatefulCounter)

class StatefulCounter extends RichMapFunction[String, (String,Int)] with Checkpointed[Integer] {
  private var count: Integer = 0
  
  def map(in: String): (String,Int) = {
    count += 1
    return (in, count)
  }
  def snapshotState(l: Long, l1: Long): Integer = {
    count
  }
  def restoreState(state: Integer) {
    count = state
  }
}


Thanks,


Jack Huang

On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <[hidden email]> wrote:
My bad, thanks for pointing that out.

On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the *withState() family of functions use the Key/Value state interface internally, so that should work.

On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <[hidden email]> wrote:
Hi Jack,

it seems you correctly enabled the checkpointing by calling `env.enableCheckpointing`. However, your UDFs have to either implement the Checkpointed interface or use the Key/Value State interface to make sure the state of the computation is snapshotted.

The documentation explains how to define your functions so that they checkpoint the state far better than I could in this post: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html

I hope I've been of some help, I'll gladly help you further if you need it.

On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
what seems to be the problem?

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 03:52 Jack Huang <[hidden email]> wrote:
Hi all,

I am doing a simple word count example and want to checkpoint the accumulated word counts. I am not having any luck getting the counts saved and restored. Can someone help?

env.enableCheckpointing(1000)
env.setStateBackend(new MemoryStateBackend())

 ...
 
inStream
    .keyBy({s => s})
    .mapWithState((in:String, count:Option[Int]) => {
        val newCount = count.getOrElse(0) + 1
        ((in, newCount), Some(newCount))
    })

    .print()


Thanks,

Jack Huang



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit




--
BR,
Stefano Baghino

Software Engineer @ Radicalbit
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint and restore states

Aljoscha Krettek
Hi,
yes Stefano is spot on! The state is only restored if a job is restarted because of abnormal failure. For state that survives stopping/canceling a job you can look at savepoints: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html This essentially uses the same mechanisms as the fault-tolerance stuff for state but makes it explicit and allows restarting from different savepoints.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 22:43 Stefano Baghino <[hidden email]> wrote:
Hello again,

thanks for giving a shot at my advice anyway but Aljoscha is far more knowledgeable then me regarding Flink. :) 

I hope I'm not getting mixed up again but I think gracefully canceling your job means you lose your job state. Am I right in saying that the state is preserved in case of abnormal termination (e.g.: the JobManager crashes) or if you explicitly create a savepoint?

On Wed, Apr 20, 2016 at 10:13 PM, Jack Huang <[hidden email]> wrote:
@Aljoscha:
For this word count example I am using a kafka topic as the input stream. The problem is that when I cancel the task and restart it, the task loses the accumulated word counts so far and start counting from 1 again. Am I missing something basic here?

@Stefano:
I also tried to implements the Checkpointed interface but had no luck either. Canceling and restarting the task did not restore the states. Here is my class:

inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
  .keyBy({s => s})
  .map(new StatefulCounter)

class StatefulCounter extends RichMapFunction[String, (String,Int)] with Checkpointed[Integer] {
  private var count: Integer = 0
  
  def map(in: String): (String,Int) = {
    count += 1
    return (in, count)
  }
  def snapshotState(l: Long, l1: Long): Integer = {
    count
  }
  def restoreState(state: Integer) {
    count = state
  }
}


Thanks,


Jack Huang

On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <[hidden email]> wrote:
My bad, thanks for pointing that out.

On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the *withState() family of functions use the Key/Value state interface internally, so that should work.

On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <[hidden email]> wrote:
Hi Jack,

it seems you correctly enabled the checkpointing by calling `env.enableCheckpointing`. However, your UDFs have to either implement the Checkpointed interface or use the Key/Value State interface to make sure the state of the computation is snapshotted.

The documentation explains how to define your functions so that they checkpoint the state far better than I could in this post: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html

I hope I've been of some help, I'll gladly help you further if you need it.

On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
what seems to be the problem?

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 03:52 Jack Huang <[hidden email]> wrote:
Hi all,

I am doing a simple word count example and want to checkpoint the accumulated word counts. I am not having any luck getting the counts saved and restored. Can someone help?

env.enableCheckpointing(1000)
env.setStateBackend(new MemoryStateBackend())

 ...
 
inStream
    .keyBy({s => s})
    .mapWithState((in:String, count:Option[Int]) => {
        val newCount = count.getOrElse(0) + 1
        ((in, newCount), Some(newCount))
    })

    .print()


Thanks,

Jack Huang



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit




--
BR,
Stefano Baghino

Software Engineer @ Radicalbit
Reply | Threaded
Open this post in threaded view
|

Re: Checkpoint and restore states

Jack Huang
@Stefano, Aljoscha:

Thank you for pointing that out. With the following steps I verified that the state of the job gets restored
  1. Use HDFS as state backend with env.setStateBackend(new FsStateBackend("hdfs:///home/user/flink/KafkaWordCount"))
  2. Start the job. In my case the job ID is e4b5316ae4ea0c8ed6fab4fa238b4b2f
  3. Observe that hdfs:///home/user/flink/KafkaWordCount/e4b5316ae4ea0c8ed6fab4fa238b4b2f is created
  4. Kill all TaskManager, but leave job manager running
  5. Restart all TaskManager with bin/start-cluster.sh
  6. Observe that the job manager automatically restarts the job under the same job ID
  7. Observe from the output that the states are restored

Jack



Jack Huang

On Thu, Apr 21, 2016 at 1:40 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
yes Stefano is spot on! The state is only restored if a job is restarted because of abnormal failure. For state that survives stopping/canceling a job you can look at savepoints: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html This essentially uses the same mechanisms as the fault-tolerance stuff for state but makes it explicit and allows restarting from different savepoints.

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 22:43 Stefano Baghino <[hidden email]> wrote:
Hello again,

thanks for giving a shot at my advice anyway but Aljoscha is far more knowledgeable then me regarding Flink. :) 

I hope I'm not getting mixed up again but I think gracefully canceling your job means you lose your job state. Am I right in saying that the state is preserved in case of abnormal termination (e.g.: the JobManager crashes) or if you explicitly create a savepoint?

On Wed, Apr 20, 2016 at 10:13 PM, Jack Huang <[hidden email]> wrote:
@Aljoscha:
For this word count example I am using a kafka topic as the input stream. The problem is that when I cancel the task and restart it, the task loses the accumulated word counts so far and start counting from 1 again. Am I missing something basic here?

@Stefano:
I also tried to implements the Checkpointed interface but had no luck either. Canceling and restarting the task did not restore the states. Here is my class:

inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } })
  .keyBy({s => s})
  .map(new StatefulCounter)

class StatefulCounter extends RichMapFunction[String, (String,Int)] with Checkpointed[Integer] {
  private var count: Integer = 0
  
  def map(in: String): (String,Int) = {
    count += 1
    return (in, count)
  }
  def snapshotState(l: Long, l1: Long): Integer = {
    count
  }
  def restoreState(state: Integer) {
    count = state
  }
}


Thanks,


Jack Huang

On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <[hidden email]> wrote:
My bad, thanks for pointing that out.

On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
the *withState() family of functions use the Key/Value state interface internally, so that should work.

On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <[hidden email]> wrote:
Hi Jack,

it seems you correctly enabled the checkpointing by calling `env.enableCheckpointing`. However, your UDFs have to either implement the Checkpointed interface or use the Key/Value State interface to make sure the state of the computation is snapshotted.

The documentation explains how to define your functions so that they checkpoint the state far better than I could in this post: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html

I hope I've been of some help, I'll gladly help you further if you need it.

On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <[hidden email]> wrote:
Hi,
what seems to be the problem?

Cheers,
Aljoscha

On Wed, 20 Apr 2016 at 03:52 Jack Huang <[hidden email]> wrote:
Hi all,

I am doing a simple word count example and want to checkpoint the accumulated word counts. I am not having any luck getting the counts saved and restored. Can someone help?

env.enableCheckpointing(1000)
env.setStateBackend(new MemoryStateBackend())

 ...
 
inStream
    .keyBy({s => s})
    .mapWithState((in:String, count:Option[Int]) => {
        val newCount = count.getOrElse(0) + 1
        ((in, newCount), Some(newCount))
    })

    .print()


Thanks,

Jack Huang



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit



--
BR,
Stefano Baghino

Software Engineer @ Radicalbit




--
BR,
Stefano Baghino

Software Engineer @ Radicalbit