Hi, I am trying to write a simple streaming program to count values from a Kafka topic in a fault tolerant manner, like this:val config: Configuration = new Configuration() config.setString(ConfigConstants.STATE_BACKEND, "filesystem") config.setString("state.backend.fs.checkpointdir", "file:///tmp/flink") val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.enableCheckpointing(10) val properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); val stream = env .addSource(new FlinkKafkaConsumer010[String]("test", new SimpleStringSchema(), properties)) .map((_, 1)) .keyBy(_._1) .mapWithState((in: (String, Int), count: Option[Int]) => { val newCount = in._2 + count.getOrElse(0) ((in._1, newCount), Some(newCount)) env.execute("Job") The idea is to use the filesystem state backend to persist the computation state (count) and to restore the computation state in case of failure or restart. I have a program that inject the same key on Kafka. But I am unable to make Flink work correctly, every time the Flink restarts the value from state is empty, so the count starts from zero. What am I missing here? I am running this on a local environment (sbt run) with Flink 1.3.1, Java 1.8.0_131, and Ubuntu 16.04. -- hooray!
-- Victor Godoy Poluceno |
Hi Victor,
from a quick look at your code, I think, you set up everything just fine (I'm not too familiar with Scala though) but the problem is probably somewhere else: As [1] states (a bit hidden maybe), checkpoints are only used to recover from failures, e.g. if you run your job on 2 task managers and one of them dies. In that case, flink's job manager will try to re-schedule the job and restart it from the latest checkpoint. I guess, what you want is a savepoint [2] (or an externalized checkpoint described in [1]) to be able to restore your program manually during start. If you run your program in a "real" flink environment as started from one of our startup scripts, you can go straight ahead to https://ci.apache.org/projects/ flink/flink-docs-release-1.3/setup/savepoints.html#operations to see how to create savepoints and restore from them. Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ checkpoints.html [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/ savepoints.html On Tuesday, 25 July 2017 14:45:49 CEST Victor Godoy Poluceno wrote: > Hi, > > I am trying to write a simple streaming program to count values from a > Kafka topic in a fault tolerant manner, like this > <https://gist.github.com/victorpoluceno/8690df8459bf3afd60477f83ec78f7a8>: > > <code> > val config: Configuration = new Configuration() > config.setString(ConfigConstants.STATE_BACKEND, "filesystem") > config.setString("state.backend.fs.checkpointdir", "file:///tmp/flink") > > val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) > env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE > ); env.enableCheckpointing(10) > > val properties = new Properties(); > properties.setProperty("bootstrap.servers", "localhost:9092"); > properties.setProperty("group.id", "test"); > > val stream = env > .addSource(new FlinkKafkaConsumer010[String]("test", new > SimpleStringSchema(), properties)) > .map((_, 1)) > .keyBy(_._1) > .mapWithState((in: (String, Int), count: Option[Int]) => { > val newCount = in._2 + count.getOrElse(0) > ((in._1, newCount), Some(newCount)) > > env.execute("Job") > </code> > > The idea is to use the filesystem state backend to persist the computation > state (count) and to restore the computation state in case of failure or > restart. I have a program that inject the same key on Kafka. But I am > unable to make Flink work correctly, every time the Flink restarts the > value from state is empty, so the count starts from zero. What am I missing > here? > > I am running this on a local environment (sbt run) with Flink 1.3.1, Java > 1.8.0_131, and Ubuntu 16.04. signature.asc (201 bytes) Download Attachment |
Free forum by Nabble | Edit this page |