CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

Hao Sun
Hi I am running flink in dev mode through Intellij, I have flink-conf.yaml correctly configured and from the log you can see job manager is reading it.

2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/

But I still somehow get this error
java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


My program only has this related to checkpointing
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.enableCheckpointing(2 * 60 * 1000)

Need some help to dig through this. Thanks

=================== Full log =================

2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents does not contain a setter for field events
2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents is not a valid POJO type because not all fields are valid POJO fields.
2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent does not contain a setter for field accountId
2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent is not a valid POJO type because not all fields are valid POJO fields.
2017-09-25 20:41:52.017 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:52.198 [main] INFO  o.a.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
2017-09-25 20:41:52.253 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.address, localhost
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.port, 6123
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.port, 8081
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.heap.mb, 1024
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.heap.mb, 1024
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.memory.preallocate, false
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: parallelism.default, 1
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: fs.hdfs.hadoopconf, flink/conf
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.log.path, /tmp/flink_logs/flink_console.log
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.log.path, /tmp/flink_logs/flink_console.log
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability, zookeeper
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.quorum, 172.18.0.7:2181
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.root, /flink
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.cluster-id, /flink_default_ns
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.storageDir, /tmp/flink/ha-recovery
2017-09-25 20:41:52.257 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Disabled queryable state server
2017-09-25 20:41:52.271 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Starting FlinkMiniCluster.
2017-09-25 20:41:52.442 [flink-akka.actor.default-dispatcher-4] INFO  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
2017-09-25 20:41:52.472 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Created BLOB server storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-af52ff47-49d3-4307-bb4e-016c5e5d8a39
2017-09-25 20:41:52.477 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Started BLOB server at 0.0.0.0:56706 - max concurrent requests: 50 - max backlog: 1000
2017-09-25 20:41:52.487 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
2017-09-25 20:41:52.496 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist  - Started memory archivist akka://flink/user/archive_1
2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Starting JobManager at akka://flink/user/jobmanager_1.
2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.jobmanager.JobManager@5b4d1f99 @ akka://flink/user/jobmanager_1
2017-09-25 20:41:52.508 [main] INFO  o.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
2017-09-25 20:41:52.514 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Temporary file directory '/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T': total 464 GB, usable 61 GB (13.15% usable)
2017-09-25 20:41:52.590 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - JobManager akka://flink/user/jobmanager_1 was granted leadership with leader session ID Some(d2f1c68f-2982-474f-8b7f-271d3f4e4192).
2017-09-25 20:41:52.592 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=d2f1c68f-2982-474f-8b7f-271d3f4e4192
2017-09-25 20:41:52.602 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
2017-09-25 20:41:52.610 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager_1#-1948249729] - leader session d2f1c68f-2982-474f-8b7f-271d3f4e4192
2017-09-25 20:41:52.899 [main] INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 363 MB for network buffer pool (number of memory segments: 11620, bytes per segment: 32768).
2017-09-25 20:41:52.915 [main] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Starting the network environment and its components.
2017-09-25 20:41:52.917 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Limiting managed memory to 1145 MB, memory will be allocated lazily.
2017-09-25 20:41:52.922 [main] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6 for spill files.
2017-09-25 20:41:52.923 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
2017-09-25 20:41:52.963 [main] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-a99429d5-981d-4cd0-9eeb-5d6678c650f0
2017-09-25 20:41:52.973 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-dd359949-3837-461c-accc-41ebc67a1d8f
2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Starting TaskManager actor at akka://flink/user/taskmanager_1#1248014944.
2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager data connection information: d7308d8350e736f55357e74e04f5c106 @ localhost (dataPort=-1)
2017-09-25 20:41:52.975 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager has 8 task slot(s).
2017-09-25 20:41:52.976 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage stats: [HEAP: 385/684/3641 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
2017-09-25 20:41:52.980 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Trying to register at JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 milliseconds)
2017-09-25 20:41:52.982 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - TaskManager d7308d8350e736f55357e74e04f5c106 has started.
2017-09-25 20:41:52.984 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager at localhost (akka://flink/user/taskmanager_1) as 8ad7f698f504d459202cdb8a9d6a9b34. Current number of registered hosts is 1. Current number of alive task slots is 8.
2017-09-25 20:41:52.987 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Successful registration at JobManager (akka://flink/user/jobmanager_1), starting network stack and library cache.
2017-09-25 20:41:52.989 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Determined BLOB server address to be localhost/127.0.0.1:56706. Starting BLOB cache.
2017-09-25 20:41:52.990 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Created BLOB cache storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-196b0de7-b951-4074-8865-a138283e66c1
2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received SubmitJobAndWait(JobGraph(jobId: 0f0d880310bc9098027c2e4877f999fb)) but there is no connection to a JobManager yet.
2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
2017-09-25 20:41:53.000 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager null.
2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connect to JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1948249729] with leader session id d2f1c68f-2982-474f-8b7f-271d3f4e4192.
2017-09-25 20:41:53.004 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Sending message to JobManager akka://flink/user/jobmanager_1 to submit job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb) and wait for progress
2017-09-25 20:41:53.005 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Upload jar files to job manager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.007 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Submit job to the job manager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.009 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Submitting job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development).
2017-09-25 20:41:53.014 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=1, delayBetweenRestartAttempts=10000) for 0f0d880310bc9098027c2e4877f999fb.
2017-09-25 20:41:53.025 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job recovers via failover strategy: full graph restart
2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Running initialization on master for job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Successfully ran initialization on master in 0 ms.
2017-09-25 20:41:53.071 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - No state backend has been configured, using default state backend (Memory / JobManager)
2017-09-25 20:41:53.078 [flink-akka.actor.default-dispatcher-2] ERROR org.apache.flink.runtime.jobmanager.JobManager  - Failed to submit job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development)
java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Terminate JobClientActor.
2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.client.JobClient  - Job execution failed
2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Stopping FlinkMiniCluster.
2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Stopping TaskManager akka://flink/user/taskmanager_1#1248014944.
2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Stopping JobManager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.091 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Disassociating from JobManager
2017-09-25 20:41:53.093 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Shutting down BlobCache
2017-09-25 20:41:53.103 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.blob.BlobServer  - Stopped BLOB server at 0.0.0.0:56706
2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager removed spill file directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6
2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Shutting down the network environment and its components.
2017-09-25 20:41:53.122 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Task manager akka://flink/user/taskmanager_1 is completely shut down.
Reply | Threaded
Open this post in threaded view
|

Re: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

Aljoscha Krettek
Hi,

I think the GlobalConfiguration is not necessarily read by the (local) JobManager. You could try using StreamExecutionEnvironment.createLocalEnvironment(int, Configuration) to manually specify a configuration.

Best,
Aljoscha

On 26. Sep 2017, at 05:49, Hao Sun <[hidden email]> wrote:

Hi I am running flink in dev mode through Intellij, I have flink-conf.yaml correctly configured and from the log you can see job manager is reading it.

2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/

But I still somehow get this error
java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


My program only has this related to checkpointing
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.enableCheckpointing(2 * 60 * 1000)

Need some help to dig through this. Thanks

=================== Full log =================

2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents does not contain a setter for field events
2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents is not a valid POJO type because not all fields are valid POJO fields.
2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent does not contain a setter for field accountId
2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent is not a valid POJO type because not all fields are valid POJO fields.
2017-09-25 20:41:52.017 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:52.198 [main] INFO  o.a.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
2017-09-25 20:41:52.253 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.address, localhost
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.port, 6123
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.port, 8081
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.heap.mb, 1024
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.heap.mb, 1024
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.memory.preallocate, false
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: parallelism.default, 1
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: fs.hdfs.hadoopconf, flink/conf
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.log.path, /tmp/flink_logs/flink_console.log
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.log.path, /tmp/flink_logs/flink_console.log
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability, zookeeper
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.quorum, 172.18.0.7:2181
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.root, /flink
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.cluster-id, /flink_default_ns
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.storageDir, /tmp/flink/ha-recovery
2017-09-25 20:41:52.257 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Disabled queryable state server
2017-09-25 20:41:52.271 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Starting FlinkMiniCluster.
2017-09-25 20:41:52.442 [flink-akka.actor.default-dispatcher-4] INFO  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
2017-09-25 20:41:52.472 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Created BLOB server storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-af52ff47-49d3-4307-bb4e-016c5e5d8a39
2017-09-25 20:41:52.477 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Started BLOB server at 0.0.0.0:56706 - max concurrent requests: 50 - max backlog: 1000
2017-09-25 20:41:52.487 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
2017-09-25 20:41:52.496 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist  - Started memory archivist <a href="akka://flink/user/archive_1" class="">akka://flink/user/archive_1
2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Starting JobManager at <a href="akka://flink/user/jobmanager_1" class="">akka://flink/user/jobmanager_1.
2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.jobmanager.JobManager@5b4d1f99 @ <a href="akka://flink/user/jobmanager_1" class="">akka://flink/user/jobmanager_1
2017-09-25 20:41:52.508 [main] INFO  o.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
2017-09-25 20:41:52.514 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Temporary file directory '/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T': total 464 GB, usable 61 GB (13.15% usable)
2017-09-25 20:41:52.590 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - JobManager <a href="akka://flink/user/jobmanager_1" class="">akka://flink/user/jobmanager_1 was granted leadership with leader session ID Some(d2f1c68f-2982-474f-8b7f-271d3f4e4192).
2017-09-25 20:41:52.592 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader <a href="akka://flink/user/jobmanager_1" class="">akka://flink/user/jobmanager_1 , session=d2f1c68f-2982-474f-8b7f-271d3f4e4192
2017-09-25 20:41:52.602 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Trying to associate with JobManager leader <a href="akka://flink/user/jobmanager_1" class="">akka://flink/user/jobmanager_1
2017-09-25 20:41:52.610 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Resource Manager associating with leading JobManager Actor[<a href="akka://flink/user/jobmanager_1#-1948249729" class="">akka://flink/user/jobmanager_1#-1948249729] - leader session d2f1c68f-2982-474f-8b7f-271d3f4e4192
2017-09-25 20:41:52.899 [main] INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 363 MB for network buffer pool (number of memory segments: 11620, bytes per segment: 32768).
2017-09-25 20:41:52.915 [main] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Starting the network environment and its components.
2017-09-25 20:41:52.917 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Limiting managed memory to 1145 MB, memory will be allocated lazily.
2017-09-25 20:41:52.922 [main] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6 for spill files.
2017-09-25 20:41:52.923 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
2017-09-25 20:41:52.963 [main] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-a99429d5-981d-4cd0-9eeb-5d6678c650f0
2017-09-25 20:41:52.973 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-dd359949-3837-461c-accc-41ebc67a1d8f
2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Starting TaskManager actor at <a href="akka://flink/user/taskmanager_1#1248014944" class="">akka://flink/user/taskmanager_1#1248014944.
2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager data connection information: d7308d8350e736f55357e74e04f5c106 @ localhost (dataPort=-1)
2017-09-25 20:41:52.975 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager has 8 task slot(s).
2017-09-25 20:41:52.976 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage stats: [HEAP: 385/684/3641 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
2017-09-25 20:41:52.980 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Trying to register at JobManager <a href="akka://flink/user/jobmanager_1" class="">akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 milliseconds)
2017-09-25 20:41:52.982 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - TaskManager d7308d8350e736f55357e74e04f5c106 has started.
2017-09-25 20:41:52.984 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager at localhost (<a href="akka://flink/user/taskmanager_1" class="">akka://flink/user/taskmanager_1) as 8ad7f698f504d459202cdb8a9d6a9b34. Current number of registered hosts is 1. Current number of alive task slots is 8.
2017-09-25 20:41:52.987 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Successful registration at JobManager (<a href="akka://flink/user/jobmanager_1" class="">akka://flink/user/jobmanager_1), starting network stack and library cache.
2017-09-25 20:41:52.989 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Determined BLOB server address to be localhost/127.0.0.1:56706. Starting BLOB cache.
2017-09-25 20:41:52.990 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Created BLOB cache storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-196b0de7-b951-4074-8865-a138283e66c1
2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received SubmitJobAndWait(JobGraph(jobId: 0f0d880310bc9098027c2e4877f999fb)) but there is no connection to a JobManager yet.
2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
2017-09-25 20:41:53.000 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager null.
2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connect to JobManager Actor[<a href="akka://flink/user/jobmanager_1#-1948249729" class="">akka://flink/user/jobmanager_1#-1948249729].
2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connected to JobManager at Actor[<a href="akka://flink/user/jobmanager_1#-1948249729" class="">akka://flink/user/jobmanager_1#-1948249729] with leader session id d2f1c68f-2982-474f-8b7f-271d3f4e4192.
2017-09-25 20:41:53.004 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Sending message to JobManager <a href="akka://flink/user/jobmanager_1" class="">akka://flink/user/jobmanager_1 to submit job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb) and wait for progress
2017-09-25 20:41:53.005 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Upload jar files to job manager <a href="akka://flink/user/jobmanager_1" class="">akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.007 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Submit job to the job manager <a href="akka://flink/user/jobmanager_1" class="">akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.009 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Submitting job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development).
2017-09-25 20:41:53.014 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=1, delayBetweenRestartAttempts=10000) for 0f0d880310bc9098027c2e4877f999fb.
2017-09-25 20:41:53.025 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job recovers via failover strategy: full graph restart
2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Running initialization on master for job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Successfully ran initialization on master in 0 ms.
2017-09-25 20:41:53.071 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - No state backend has been configured, using default state backend (Memory / JobManager)
2017-09-25 20:41:53.078 [flink-akka.actor.default-dispatcher-2] ERROR org.apache.flink.runtime.jobmanager.JobManager  - Failed to submit job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development)
java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Terminate JobClientActor.
2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager Actor[<a href="akka://flink/user/jobmanager_1#-1948249729" class="">akka://flink/user/jobmanager_1#-1948249729].
2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.client.JobClient  - Job execution failed
2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Stopping FlinkMiniCluster.
2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Stopping TaskManager <a href="akka://flink/user/taskmanager_1#1248014944" class="">akka://flink/user/taskmanager_1#1248014944.
2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Stopping JobManager <a href="akka://flink/user/jobmanager_1" class="">akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.091 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Disassociating from JobManager
2017-09-25 20:41:53.093 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Shutting down BlobCache
2017-09-25 20:41:53.103 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.blob.BlobServer  - Stopped BLOB server at 0.0.0.0:56706
2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager removed spill file directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6
2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Shutting down the network environment and its components.
2017-09-25 20:41:53.122 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Task manager <a href="akka://flink/user/taskmanager_1" class="">akka://flink/user/taskmanager_1 is completely shut down.

Reply | Threaded
Open this post in threaded view
|

Re: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

Hao Sun
Thanks Aljoscha, I still have questions.
Do I have to parse the yaml to a Configuration file? If JM is not reading the config how is reading it? the thread is [main] from the logs.
Why JM does not read the config file by default?
def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism):
StreamExecutionEnvironment = {
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
}

@PublicEvolving
def createLocalEnvironmentWithWebUI(config: Configuration = null): StreamExecutionEnvironment = {
val conf: Configuration = if (config == null) new Configuration() else config
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf))
}

On Tue, Sep 26, 2017 at 6:25 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,

I think the GlobalConfiguration is not necessarily read by the (local) JobManager. You could try using StreamExecutionEnvironment.createLocalEnvironment(int, Configuration) to manually specify a configuration.

Best,
Aljoscha

On 26. Sep 2017, at 05:49, Hao Sun <[hidden email]> wrote:

Hi I am running flink in dev mode through Intellij, I have flink-conf.yaml correctly configured and from the log you can see job manager is reading it.

2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/

But I still somehow get this error
java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


My program only has this related to checkpointing
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.enableCheckpointing(2 * 60 * 1000)

Need some help to dig through this. Thanks

=================== Full log =================

2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents does not contain a setter for field events
2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents is not a valid POJO type because not all fields are valid POJO fields.
2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent does not contain a setter for field accountId
2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent is not a valid POJO type because not all fields are valid POJO fields.
2017-09-25 20:41:52.017 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:52.198 [main] INFO  o.a.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
2017-09-25 20:41:52.253 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.address, localhost
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.port, 6123
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.port, 8081
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.heap.mb, 1024
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.heap.mb, 1024
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.memory.preallocate, false
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: parallelism.default, 1
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: fs.hdfs.hadoopconf, flink/conf
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.log.path, /tmp/flink_logs/flink_console.log
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.log.path, /tmp/flink_logs/flink_console.log
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability, zookeeper
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.quorum, 172.18.0.7:2181
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.root, /flink
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.cluster-id, /flink_default_ns
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.storageDir, /tmp/flink/ha-recovery
2017-09-25 20:41:52.257 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Disabled queryable state server
2017-09-25 20:41:52.271 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Starting FlinkMiniCluster.
2017-09-25 20:41:52.442 [flink-akka.actor.default-dispatcher-4] INFO  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
2017-09-25 20:41:52.472 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Created BLOB server storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-af52ff47-49d3-4307-bb4e-016c5e5d8a39
2017-09-25 20:41:52.477 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Started BLOB server at 0.0.0.0:56706 - max concurrent requests: 50 - max backlog: 1000
2017-09-25 20:41:52.487 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
2017-09-25 20:41:52.496 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist  - Started memory archivist akka://flink/user/archive_1
2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Starting JobManager at akka://flink/user/jobmanager_1.
2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.jobmanager.JobManager@5b4d1f99 @ akka://flink/user/jobmanager_1
2017-09-25 20:41:52.508 [main] INFO  o.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
2017-09-25 20:41:52.514 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Temporary file directory '/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T': total 464 GB, usable 61 GB (13.15% usable)
2017-09-25 20:41:52.590 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - JobManager akka://flink/user/jobmanager_1 was granted leadership with leader session ID Some(d2f1c68f-2982-474f-8b7f-271d3f4e4192).
2017-09-25 20:41:52.592 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=d2f1c68f-2982-474f-8b7f-271d3f4e4192
2017-09-25 20:41:52.602 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
2017-09-25 20:41:52.610 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager_1#-1948249729] - leader session d2f1c68f-2982-474f-8b7f-271d3f4e4192
2017-09-25 20:41:52.899 [main] INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 363 MB for network buffer pool (number of memory segments: 11620, bytes per segment: 32768).
2017-09-25 20:41:52.915 [main] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Starting the network environment and its components.
2017-09-25 20:41:52.917 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Limiting managed memory to 1145 MB, memory will be allocated lazily.
2017-09-25 20:41:52.922 [main] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6 for spill files.
2017-09-25 20:41:52.923 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
2017-09-25 20:41:52.963 [main] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-a99429d5-981d-4cd0-9eeb-5d6678c650f0
2017-09-25 20:41:52.973 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-dd359949-3837-461c-accc-41ebc67a1d8f
2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Starting TaskManager actor at akka://flink/user/taskmanager_1#1248014944.
2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager data connection information: d7308d8350e736f55357e74e04f5c106 @ localhost (dataPort=-1)
2017-09-25 20:41:52.975 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager has 8 task slot(s).
2017-09-25 20:41:52.976 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage stats: [HEAP: 385/684/3641 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
2017-09-25 20:41:52.980 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Trying to register at JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 milliseconds)
2017-09-25 20:41:52.982 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - TaskManager d7308d8350e736f55357e74e04f5c106 has started.
2017-09-25 20:41:52.984 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager at localhost (akka://flink/user/taskmanager_1) as 8ad7f698f504d459202cdb8a9d6a9b34. Current number of registered hosts is 1. Current number of alive task slots is 8.
2017-09-25 20:41:52.987 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Successful registration at JobManager (akka://flink/user/jobmanager_1), starting network stack and library cache.
2017-09-25 20:41:52.989 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Determined BLOB server address to be localhost/127.0.0.1:56706. Starting BLOB cache.
2017-09-25 20:41:52.990 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Created BLOB cache storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-196b0de7-b951-4074-8865-a138283e66c1
2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received SubmitJobAndWait(JobGraph(jobId: 0f0d880310bc9098027c2e4877f999fb)) but there is no connection to a JobManager yet.
2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
2017-09-25 20:41:53.000 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager null.
2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connect to JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1948249729] with leader session id d2f1c68f-2982-474f-8b7f-271d3f4e4192.
2017-09-25 20:41:53.004 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Sending message to JobManager akka://flink/user/jobmanager_1 to submit job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb) and wait for progress
2017-09-25 20:41:53.005 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Upload jar files to job manager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.007 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Submit job to the job manager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.009 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Submitting job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development).
2017-09-25 20:41:53.014 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=1, delayBetweenRestartAttempts=10000) for 0f0d880310bc9098027c2e4877f999fb.
2017-09-25 20:41:53.025 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job recovers via failover strategy: full graph restart
2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Running initialization on master for job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Successfully ran initialization on master in 0 ms.
2017-09-25 20:41:53.071 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - No state backend has been configured, using default state backend (Memory / JobManager)
2017-09-25 20:41:53.078 [flink-akka.actor.default-dispatcher-2] ERROR org.apache.flink.runtime.jobmanager.JobManager  - Failed to submit job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development)
java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Terminate JobClientActor.
2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.client.JobClient  - Job execution failed
2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Stopping FlinkMiniCluster.
2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Stopping TaskManager akka://flink/user/taskmanager_1#1248014944.
2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Stopping JobManager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.091 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Disassociating from JobManager
2017-09-25 20:41:53.093 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Shutting down BlobCache
2017-09-25 20:41:53.103 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.blob.BlobServer  - Stopped BLOB server at 0.0.0.0:56706
2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager removed spill file directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6
2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Shutting down the network environment and its components.
2017-09-25 20:41:53.122 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Task manager akka://flink/user/taskmanager_1 is completely shut down.

Reply | Threaded
Open this post in threaded view
|

Re: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

Aljoscha Krettek
I'm not sure whether the JM is reading it or not. But you can manually set the values on the Configuration using the setter methods.


On 26. Sep 2017, at 16:58, Hao Sun <[hidden email]> wrote:

Thanks Aljoscha, I still have questions.
Do I have to parse the yaml to a Configuration file? If JM is not reading the config how is reading it? the thread is [main] from the logs.
Why JM does not read the config file by default?
def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism):
StreamExecutionEnvironment = {
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
}

@PublicEvolving
def createLocalEnvironmentWithWebUI(config: Configuration = null): StreamExecutionEnvironment = {
val conf: Configuration = if (config == null) new Configuration() else config
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf))
}

On Tue, Sep 26, 2017 at 6:25 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,

I think the GlobalConfiguration is not necessarily read by the (local) JobManager. You could try using StreamExecutionEnvironment.createLocalEnvironment(int, Configuration) to manually specify a configuration.

Best,
Aljoscha

On 26. Sep 2017, at 05:49, Hao Sun <[hidden email]> wrote:

Hi I am running flink in dev mode through Intellij, I have flink-conf.yaml correctly configured and from the log you can see job manager is reading it.

2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/

But I still somehow get this error
java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


My program only has this related to checkpointing
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.enableCheckpointing(2 * 60 * 1000)

Need some help to dig through this. Thanks

=================== Full log =================

2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents does not contain a setter for field events
2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents is not a valid POJO type because not all fields are valid POJO fields.
2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent does not contain a setter for field accountId
2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent is not a valid POJO type because not all fields are valid POJO fields.
2017-09-25 20:41:52.017 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:52.198 [main] INFO  o.a.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
2017-09-25 20:41:52.253 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.address, localhost
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.port, 6123
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.port, 8081
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.heap.mb, 1024
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.heap.mb, 1024
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.memory.preallocate, false
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: parallelism.default, 1
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: fs.hdfs.hadoopconf, flink/conf
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.log.path, /tmp/flink_logs/flink_console.log
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.log.path, /tmp/flink_logs/flink_console.log
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability, zookeeper
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.quorum, 172.18.0.7:2181
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.root, /flink
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.cluster-id, /flink_default_ns
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.storageDir, /tmp/flink/ha-recovery
2017-09-25 20:41:52.257 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Disabled queryable state server
2017-09-25 20:41:52.271 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Starting FlinkMiniCluster.
2017-09-25 20:41:52.442 [flink-akka.actor.default-dispatcher-4] INFO  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
2017-09-25 20:41:52.472 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Created BLOB server storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-af52ff47-49d3-4307-bb4e-016c5e5d8a39
2017-09-25 20:41:52.477 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Started BLOB server at 0.0.0.0:56706 - max concurrent requests: 50 - max backlog: 1000
2017-09-25 20:41:52.487 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
2017-09-25 20:41:52.496 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist  - Started memory archivist akka://flink/user/archive_1
2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Starting JobManager at akka://flink/user/jobmanager_1.
2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.jobmanager.JobManager@5b4d1f99 @ akka://flink/user/jobmanager_1
2017-09-25 20:41:52.508 [main] INFO  o.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
2017-09-25 20:41:52.514 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Temporary file directory '/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T': total 464 GB, usable 61 GB (13.15% usable)
2017-09-25 20:41:52.590 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - JobManager akka://flink/user/jobmanager_1 was granted leadership with leader session ID Some(d2f1c68f-2982-474f-8b7f-271d3f4e4192).
2017-09-25 20:41:52.592 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=d2f1c68f-2982-474f-8b7f-271d3f4e4192
2017-09-25 20:41:52.602 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
2017-09-25 20:41:52.610 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager_1#-1948249729] - leader session d2f1c68f-2982-474f-8b7f-271d3f4e4192
2017-09-25 20:41:52.899 [main] INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 363 MB for network buffer pool (number of memory segments: 11620, bytes per segment: 32768).
2017-09-25 20:41:52.915 [main] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Starting the network environment and its components.
2017-09-25 20:41:52.917 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Limiting managed memory to 1145 MB, memory will be allocated lazily.
2017-09-25 20:41:52.922 [main] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6 for spill files.
2017-09-25 20:41:52.923 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
2017-09-25 20:41:52.963 [main] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-a99429d5-981d-4cd0-9eeb-5d6678c650f0
2017-09-25 20:41:52.973 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-dd359949-3837-461c-accc-41ebc67a1d8f
2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Starting TaskManager actor at akka://flink/user/taskmanager_1#1248014944.
2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager data connection information: d7308d8350e736f55357e74e04f5c106 @ localhost (dataPort=-1)
2017-09-25 20:41:52.975 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager has 8 task slot(s).
2017-09-25 20:41:52.976 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage stats: [HEAP: 385/684/3641 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
2017-09-25 20:41:52.980 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Trying to register at JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 milliseconds)
2017-09-25 20:41:52.982 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - TaskManager d7308d8350e736f55357e74e04f5c106 has started.
2017-09-25 20:41:52.984 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager at localhost (akka://flink/user/taskmanager_1) as 8ad7f698f504d459202cdb8a9d6a9b34. Current number of registered hosts is 1. Current number of alive task slots is 8.
2017-09-25 20:41:52.987 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Successful registration at JobManager (akka://flink/user/jobmanager_1), starting network stack and library cache.
2017-09-25 20:41:52.989 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Determined BLOB server address to be localhost/127.0.0.1:56706. Starting BLOB cache.
2017-09-25 20:41:52.990 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Created BLOB cache storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-196b0de7-b951-4074-8865-a138283e66c1
2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received SubmitJobAndWait(JobGraph(jobId: 0f0d880310bc9098027c2e4877f999fb)) but there is no connection to a JobManager yet.
2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
2017-09-25 20:41:53.000 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager null.
2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connect to JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1948249729] with leader session id d2f1c68f-2982-474f-8b7f-271d3f4e4192.
2017-09-25 20:41:53.004 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Sending message to JobManager akka://flink/user/jobmanager_1 to submit job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb) and wait for progress
2017-09-25 20:41:53.005 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Upload jar files to job manager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.007 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Submit job to the job manager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.009 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Submitting job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development).
2017-09-25 20:41:53.014 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=1, delayBetweenRestartAttempts=10000) for 0f0d880310bc9098027c2e4877f999fb.
2017-09-25 20:41:53.025 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job recovers via failover strategy: full graph restart
2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Running initialization on master for job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Successfully ran initialization on master in 0 ms.
2017-09-25 20:41:53.071 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - No state backend has been configured, using default state backend (Memory / JobManager)
2017-09-25 20:41:53.078 [flink-akka.actor.default-dispatcher-2] ERROR org.apache.flink.runtime.jobmanager.JobManager  - Failed to submit job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development)
java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Terminate JobClientActor.
2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.client.JobClient  - Job execution failed
2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Stopping FlinkMiniCluster.
2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Stopping TaskManager akka://flink/user/taskmanager_1#1248014944.
2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Stopping JobManager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.091 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Disassociating from JobManager
2017-09-25 20:41:53.093 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Shutting down BlobCache
2017-09-25 20:41:53.103 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.blob.BlobServer  - Stopped BLOB server at 0.0.0.0:56706
2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager removed spill file directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6
2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Shutting down the network environment and its components.
2017-09-25 20:41:53.122 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Task manager akka://flink/user/taskmanager_1 is completely shut down.


Reply | Threaded
Open this post in threaded view
|

Re: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured.

Hao Sun
Thanks, I will try that.

On Tue, Sep 26, 2017 at 8:24 AM Aljoscha Krettek <[hidden email]> wrote:
I'm not sure whether the JM is reading it or not. But you can manually set the values on the Configuration using the setter methods.


On 26. Sep 2017, at 16:58, Hao Sun <[hidden email]> wrote:

Thanks Aljoscha, I still have questions.
Do I have to parse the yaml to a Configuration file? If JM is not reading the config how is reading it? the thread is [main] from the logs.
Why JM does not read the config file by default?
def createLocalEnvironment(parallelism: Int = JavaEnv.getDefaultLocalParallelism):
StreamExecutionEnvironment = {
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(parallelism))
}

@PublicEvolving
def createLocalEnvironmentWithWebUI(config: Configuration = null): StreamExecutionEnvironment = {
val conf: Configuration = if (config == null) new Configuration() else config
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironmentWithWebUI(conf))
}

On Tue, Sep 26, 2017 at 6:25 AM Aljoscha Krettek <[hidden email]> wrote:
Hi,

I think the GlobalConfiguration is not necessarily read by the (local) JobManager. You could try using StreamExecutionEnvironment.createLocalEnvironment(int, Configuration) to manually specify a configuration.

Best,
Aljoscha

On 26. Sep 2017, at 05:49, Hao Sun <[hidden email]> wrote:

Hi I am running flink in dev mode through Intellij, I have flink-conf.yaml correctly configured and from the log you can see job manager is reading it.

2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/

But I still somehow get this error
java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


My program only has this related to checkpointing
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getCheckpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
env.enableCheckpointing(2 * 60 * 1000)

Need some help to dig through this. Thanks

=================== Full log =================

2017-09-25 20:41:51.466 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents does not contain a setter for field events
2017-09-25 20:41:51.946 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFilterEvents is not a valid POJO type because not all fields are valid POJO fields.
2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent does not contain a setter for field accountId
2017-09-25 20:41:51.985 [main] INFO  org.apache.flink.api.java.typeutils.TypeExtractor  - class com.zendesk.fraud_prevention.data_types.MaxwellFPSEvent is not a valid POJO type because not all fields are valid POJO fields.
2017-09-25 20:41:52.017 [ForkJoinPool-1-worker-13] INFO  com.zendesk.consul.Consul  - Collecting kafka nodes from Consul(consul.docker:8500) for tags=List(dev)
2017-09-25 20:41:52.198 [main] INFO  o.a.flink.streaming.api.environment.LocalStreamEnvironment  - Running job on local embedded Flink mini cluster
2017-09-25 20:41:52.253 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.address, localhost
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.rpc.port, 6123
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.port, 8081
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.heap.mb, 1024
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.heap.mb, 1024
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.memory.preallocate, false
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: parallelism.default, 1
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: fs.hdfs.hadoopconf, flink/conf
2017-09-25 20:41:52.255 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend, rocksdb
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.backend.fs.checkpointdir, /tmp/flink/checkpoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.checkpoints.dir, /tmp/flink/checkpoints-meta/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: state.savepoints.dir, /tmp/flink/savepoints/
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: taskmanager.log.path, /tmp/flink_logs/flink_console.log
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: jobmanager.web.log.path, /tmp/flink_logs/flink_console.log
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability, zookeeper
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.quorum, 172.18.0.7:2181
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.root, /flink
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.path.cluster-id, /flink_default_ns
2017-09-25 20:41:52.256 [main] INFO  org.apache.flink.configuration.GlobalConfiguration  - Loading configuration property: high-availability.zookeeper.storageDir, /tmp/flink/ha-recovery
2017-09-25 20:41:52.257 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Disabled queryable state server
2017-09-25 20:41:52.271 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Starting FlinkMiniCluster.
2017-09-25 20:41:52.442 [flink-akka.actor.default-dispatcher-4] INFO  akka.event.slf4j.Slf4jLogger  - Slf4jLogger started
2017-09-25 20:41:52.472 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Created BLOB server storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-af52ff47-49d3-4307-bb4e-016c5e5d8a39
2017-09-25 20:41:52.477 [main] INFO  org.apache.flink.runtime.blob.BlobServer  - Started BLOB server at 0.0.0.0:56706 - max concurrent requests: 50 - max backlog: 1000
2017-09-25 20:41:52.487 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
2017-09-25 20:41:52.496 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.MemoryArchivist  - Started memory archivist akka://flink/user/archive_1
2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Starting JobManager at akka://flink/user/jobmanager_1.
2017-09-25 20:41:52.501 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Proposing leadership to contender org.apache.flink.runtime.jobmanager.JobManager@5b4d1f99 @ akka://flink/user/jobmanager_1
2017-09-25 20:41:52.508 [main] INFO  o.apache.flink.runtime.taskexecutor.TaskManagerConfiguration  - Messages have a max timeout of 10000 ms
2017-09-25 20:41:52.514 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Temporary file directory '/var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T': total 464 GB, usable 61 GB (13.15% usable)
2017-09-25 20:41:52.590 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.jobmanager.JobManager  - JobManager akka://flink/user/jobmanager_1 was granted leadership with leader session ID Some(d2f1c68f-2982-474f-8b7f-271d3f4e4192).
2017-09-25 20:41:52.592 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.h.nonha.embedded.EmbeddedLeaderService  - Received confirmation of leadership for leader akka://flink/user/jobmanager_1 , session=d2f1c68f-2982-474f-8b7f-271d3f4e4192
2017-09-25 20:41:52.602 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Trying to associate with JobManager leader akka://flink/user/jobmanager_1
2017-09-25 20:41:52.610 [flink-akka.actor.default-dispatcher-3] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - Resource Manager associating with leading JobManager Actor[akka://flink/user/jobmanager_1#-1948249729] - leader session d2f1c68f-2982-474f-8b7f-271d3f4e4192
2017-09-25 20:41:52.899 [main] INFO  org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated 363 MB for network buffer pool (number of memory segments: 11620, bytes per segment: 32768).
2017-09-25 20:41:52.915 [main] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Starting the network environment and its components.
2017-09-25 20:41:52.917 [main] INFO  org.apache.flink.runtime.taskexecutor.TaskManagerServices  - Limiting managed memory to 1145 MB, memory will be allocated lazily.
2017-09-25 20:41:52.922 [main] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6 for spill files.
2017-09-25 20:41:52.923 [main] INFO  org.apache.flink.runtime.metrics.MetricRegistry  - No metrics reporter configured, no metrics will be exposed/reported.
2017-09-25 20:41:52.963 [main] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-a99429d5-981d-4cd0-9eeb-5d6678c650f0
2017-09-25 20:41:52.973 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.filecache.FileCache  - User file cache uses directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-dist-cache-dd359949-3837-461c-accc-41ebc67a1d8f
2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Starting TaskManager actor at akka://flink/user/taskmanager_1#1248014944.
2017-09-25 20:41:52.974 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager data connection information: d7308d8350e736f55357e74e04f5c106 @ localhost (dataPort=-1)
2017-09-25 20:41:52.975 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - TaskManager has 8 task slot(s).
2017-09-25 20:41:52.976 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Memory usage stats: [HEAP: 385/684/3641 MB, NON HEAP: 35/36/-1 MB (used/committed/max)]
2017-09-25 20:41:52.980 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Trying to register at JobManager akka://flink/user/jobmanager_1 (attempt 1, timeout: 500 milliseconds)
2017-09-25 20:41:52.982 [flink-akka.actor.default-dispatcher-2] INFO  o.a.f.r.c.standalone.StandaloneResourceManager  - TaskManager d7308d8350e736f55357e74e04f5c106 has started.
2017-09-25 20:41:52.984 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.instance.InstanceManager  - Registered TaskManager at localhost (akka://flink/user/taskmanager_1) as 8ad7f698f504d459202cdb8a9d6a9b34. Current number of registered hosts is 1. Current number of alive task slots is 8.
2017-09-25 20:41:52.987 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Successful registration at JobManager (akka://flink/user/jobmanager_1), starting network stack and library cache.
2017-09-25 20:41:52.989 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Determined BLOB server address to be localhost/127.0.0.1:56706. Starting BLOB cache.
2017-09-25 20:41:52.990 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Created BLOB cache storage directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/blobStore-196b0de7-b951-4074-8865-a138283e66c1
2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received SubmitJobAndWait(JobGraph(jobId: 0f0d880310bc9098027c2e4877f999fb)) but there is no connection to a JobManager yet.
2017-09-25 20:41:52.999 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Received job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
2017-09-25 20:41:53.000 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager null.
2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connect to JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
2017-09-25 20:41:53.003 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1948249729] with leader session id d2f1c68f-2982-474f-8b7f-271d3f4e4192.
2017-09-25 20:41:53.004 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Sending message to JobManager akka://flink/user/jobmanager_1 to submit job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb) and wait for progress
2017-09-25 20:41:53.005 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Upload jar files to job manager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.007 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Submit job to the job manager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.009 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Submitting job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development).
2017-09-25 20:41:53.014 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=1, delayBetweenRestartAttempts=10000) for 0f0d880310bc9098027c2e4877f999fb.
2017-09-25 20:41:53.025 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job recovers via failover strategy: full graph restart
2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Running initialization on master for job Kafka 0.10 Example development (0f0d880310bc9098027c2e4877f999fb).
2017-09-25 20:41:53.055 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Successfully ran initialization on master in 0 ms.
2017-09-25 20:41:53.071 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.jobmanager.JobManager  - No state backend has been configured, using default state backend (Memory / JobManager)
2017-09-25 20:41:53.078 [flink-akka.actor.default-dispatcher-2] ERROR org.apache.flink.runtime.jobmanager.JobManager  - Failed to submit job 0f0d880310bc9098027c2e4877f999fb (Kafka 0.10 Example development)
java.lang.IllegalStateException: CheckpointConfig says to persist periodic checkpoints, but no checkpoint directory has been configured. You can configure configure one via key 'state.checkpoints.dir'.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:209)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:451)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:278)
at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Terminate JobClientActor.
2017-09-25 20:41:53.085 [flink-akka.actor.default-dispatcher-3] INFO  org.apache.flink.runtime.client.JobSubmissionClientActor  - Disconnect from JobManager Actor[akka://flink/user/jobmanager_1#-1948249729].
2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.client.JobClient  - Job execution failed
2017-09-25 20:41:53.086 [main] INFO  org.apache.flink.runtime.minicluster.FlinkMiniCluster  - Stopping FlinkMiniCluster.
2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Stopping TaskManager akka://flink/user/taskmanager_1#1248014944.
2017-09-25 20:41:53.090 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.jobmanager.JobManager  - Stopping JobManager akka://flink/user/jobmanager_1.
2017-09-25 20:41:53.091 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Disassociating from JobManager
2017-09-25 20:41:53.093 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.blob.BlobCache  - Shutting down BlobCache
2017-09-25 20:41:53.103 [flink-akka.actor.default-dispatcher-4] INFO  org.apache.flink.runtime.blob.BlobServer  - Stopped BLOB server at 0.0.0.0:56706
2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.disk.iomanager.IOManager  - I/O manager removed spill file directory /var/folders/37/6tz6x9x97d19_ltcm_1tl_f00000gp/T/flink-io-698c06ae-3d56-4765-b9b8-8a5d8b70ffd6
2017-09-25 20:41:53.104 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.io.network.NetworkEnvironment  - Shutting down the network environment and its components.
2017-09-25 20:41:53.122 [flink-akka.actor.default-dispatcher-2] INFO  org.apache.flink.runtime.taskmanager.TaskManager  - Task manager akka://flink/user/taskmanager_1 is completely shut down.