Unable to make mapWithState work correctly

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Unable to make mapWithState work correctly

Victor Godoy Poluceno
Hi,

I am trying to write a simple streaming program to count values from a Kafka topic in a fault tolerant manner, like this:

<code>
val config: Configuration = new Configuration()
config.setString(ConfigConstants.STATE_BACKEND, "filesystem")
config.setString("state.backend.fs.checkpointdir", "file:///tmp/flink")

val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.enableCheckpointing(10)

val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");

val stream = env
    .addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), properties))
    .map((_, 1))
    .keyBy(_._1)
    .mapWithState((in: (String, Int), count: Option[Int]) => {
      val newCount = in._2 + count.getOrElse(0)
      ((in._1, newCount), Some(newCount))
    }).print

env.execute("Job")
</code>

The idea is to use the filesystem state backend to persist the computation state (count) and to restore the computation state in case of failure or restart. I have a program that inject the same key on Kafka. But I am unable to make Flink work correctly, every time the Flink restarts the value from state is empty, so the count starts from zero. What am I missing here?

I am running this on a local environment (sbt run) with Flink 1.3.1, Java 1.8.0_131, and Ubuntu 16.04.

--
hooray!

--
Victor Godoy Poluceno
Reply | Threaded
Open this post in threaded view
|

Re: Unable to make mapWithState work correctly

Nico Kruber
Hi Victor,
from a quick look at your code, I think, you set up everything just fine (I'm
not too familiar with Scala though) but the problem is probably somewhere
else:
As [1] states (a bit hidden maybe), checkpoints are only used to recover from
failures, e.g. if you run your job on 2 task managers and one of them dies. In
that case, flink's job manager will try to re-schedule the job and restart it
from the latest checkpoint.

I guess, what you want is a savepoint [2] (or an externalized checkpoint
described in [1]) to be able to restore your program manually during start. If
you run your program in a "real" flink environment as started from one of our
startup scripts, you can go straight ahead to https://ci.apache.org/projects/
flink/flink-docs-release-1.3/setup/savepoints.html#operations to see how to
create savepoints and restore from them.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
checkpoints.html
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/
savepoints.html

On Tuesday, 25 July 2017 14:45:49 CEST Victor Godoy Poluceno wrote:

> Hi,
>
> I am trying to write a simple streaming program to count values from a
> Kafka topic in a fault tolerant manner, like this
> <https://gist.github.com/victorpoluceno/8690df8459bf3afd60477f83ec78f7a8>:
>
> <code>
> val config: Configuration = new Configuration()
> config.setString(ConfigConstants.STATE_BACKEND, "filesystem")
> config.setString("state.backend.fs.checkpointdir", "file:///tmp/flink")
>
> val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)
> env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE
> ); env.enableCheckpointing(10)
>
> val properties = new Properties();
> properties.setProperty("bootstrap.servers", "localhost:9092");
> properties.setProperty("group.id", "test");
>
> val stream = env
>     .addSource(new FlinkKafkaConsumer010[String]("test", new
> SimpleStringSchema(), properties))
>     .map((_, 1))
>     .keyBy(_._1)
>     .mapWithState((in: (String, Int), count: Option[Int]) => {
>       val newCount = in._2 + count.getOrElse(0)
>       ((in._1, newCount), Some(newCount))
>     }).print
>
> env.execute("Job")
> </code>
>
> The idea is to use the filesystem state backend to persist the computation
> state (count) and to restore the computation state in case of failure or
> restart. I have a program that inject the same key on Kafka. But I am
> unable to make Flink work correctly, every time the Flink restarts the
> value from state is empty, so the count starts from zero. What am I missing
> here?
>
> I am running this on a local environment (sbt run) with Flink 1.3.1, Java
> 1.8.0_131, and Ubuntu 16.04.


signature.asc (201 bytes) Download Attachment