Hi all,
I am doing a simple word count example and want to checkpoint the accumulated word counts. I am not having any luck getting the counts saved and restored. Can someone help? env.enableCheckpointing(1000) env.setStateBackend(new MemoryStateBackend())
inStream Thanks, Jack Huang |
Hi, what seems to be the problem? Cheers, Aljoscha On Wed, 20 Apr 2016 at 03:52 Jack Huang <[hidden email]> wrote:
|
Hi Jack, it seems you correctly enabled the checkpointing by calling `env.enableCheckpointing`. However, your UDFs have to either implement the Checkpointed interface or use the Key/Value State interface to make sure the state of the computation is snapshotted. The documentation explains how to define your functions so that they checkpoint the state far better than I could in this post: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/state.html I hope I've been of some help, I'll gladly help you further if you need it. On Wed, Apr 20, 2016 at 11:34 AM, Aljoscha Krettek <[hidden email]> wrote:
BR, Stefano Baghino |
Hi, the *withState() family of functions use the Key/Value state interface internally, so that should work. On Wed, 20 Apr 2016 at 12:33 Stefano Baghino <[hidden email]> wrote:
|
My bad, thanks for pointing that out. On Wed, Apr 20, 2016 at 1:49 PM, Aljoscha Krettek <[hidden email]> wrote:
BR, Stefano Baghino |
@Aljoscha: For this word count example I am using a kafka topic as the input stream. The problem is that when I cancel the task and restart it, the task loses the accumulated word counts so far and start counting from 1 again. Am I missing something basic here? @Stefano: I also tried to implements the Checkpointed interface but had no luck either. Canceling and restarting the task did not restore the states. Here is my class: inStream.flatMap({ _.toLowerCase.split("\\W+") filter { _.nonEmpty } }) class StatefulCounter extends RichMapFunction[String, (String,Int)] with Checkpointed[Integer] { Thanks, Jack Huang On Wed, Apr 20, 2016 at 5:36 AM, Stefano Baghino <[hidden email]> wrote:
|
Hello again, thanks for giving a shot at my advice anyway but Aljoscha is far more knowledgeable then me regarding Flink. :) I hope I'm not getting mixed up again but I think gracefully canceling your job means you lose your job state. Am I right in saying that the state is preserved in case of abnormal termination (e.g.: the JobManager crashes) or if you explicitly create a savepoint? On Wed, Apr 20, 2016 at 10:13 PM, Jack Huang <[hidden email]> wrote:
BR, Stefano Baghino |
Hi, yes Stefano is spot on! The state is only restored if a job is restarted because of abnormal failure. For state that survives stopping/canceling a job you can look at savepoints: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/savepoints.html This essentially uses the same mechanisms as the fault-tolerance stuff for state but makes it explicit and allows restarting from different savepoints. Cheers, Aljoscha On Wed, 20 Apr 2016 at 22:43 Stefano Baghino <[hidden email]> wrote:
|
@Stefano, Aljoscha: Thank you for pointing that out. With the following steps I verified that the state of the job gets restored
Jack Jack Huang On Thu, Apr 21, 2016 at 1:40 AM, Aljoscha Krettek <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |