Hi everyone,
I am trying to set up flink with a hdfs state backend. I configured state.backend and state.backend.fs.checkpointdir parameters in the flink-conf.yaml. I run the flink task and the checkpoint directories are created in hdfs, so it appears it can connect and talk to hdfs just fine. Unfortunately no files are ever created in the hdfs directory. I checked that the state is being saved and restored from the task manager memory and that works fine, it just never writes to hdfs.
Am I missing a step? Do I need to do anything to force a write to hdfs? Does the state variable have to be a particular type to work with hdfs?
This is what my snapshot functions look like:
override def restoreState (rState: scala.collection.mutable.HashMap[String, String]): Unit = {
state = rState
}
override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): scala.collection.mutable.HashMap[String, String] = {
state
}
Thanks!
-Jason
P.S. I am running Flink v1.0.1, Hadoop 2.7.1, and Scala 2.11