Checkpointing in Flink 1.5.0

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Checkpointing in Flink 1.5.0

James Isaac
 The Flink documentation says that we need to specify the filesystem type (file://, hdfs://) when configuring the rocksdb backend dir.

But when I do this, I get an error on job submission saying that relative paths are not permitted in the rocksdb stand backend.
I am submitting the job via flink cli (bin/flink run).

Also, even though I give a local file system path "file:///home/abc/share", it is a shared GlusterFS volume mount, so it will be accessible by the JobManager and all TaskManagers.

I removed the filesystem type from the rocksdb backend dir configuration, and though the job got submitted, the rocksdb checkpoint directory was not created.
I have enabled checkpointing in my Flink application. 

I am using Flink 1.5.0.

Any help or pointers would be appreciated.
Reply | Threaded
Open this post in threaded view
|

Re: Checkpointing in Flink 1.5.0

Chesnay Schepler
Doesn't sound like intended behavior, can you give us the stacktrace?

On 03.07.2018 13:17, Data Engineer wrote:

>  The Flink documentation says that we need to specify the filesystem
> type (file://, hdfs://) when configuring the rocksdb backend dir.
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html#the-rocksdbstatebackend
>
> But when I do this, I get an error on job submission saying that
> relative paths are not permitted in the rocksdb stand backend.
> I am submitting the job via flink cli (bin/flink run).
>
> Also, even though I give a local file system path
> "file:///home/abc/share", it is a shared GlusterFS volume mount, so it
> will be accessible by the JobManager and all TaskManagers.
>
> I removed the filesystem type from the rocksdb backend dir
> configuration, and though the job got submitted, the rocksdb
> checkpoint directory was not created.
> I have enabled checkpointing in my Flink application.
>
> I am using Flink 1.5.0.
>
> Any help or pointers would be appreciated.


Reply | Threaded
Open this post in threaded view
|

Re: Checkpointing in Flink 1.5.0

James Isaac
2018-07-03 11:30:35,703 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting StandaloneSessionClusterEntrypoint (Version: <unknown>, Rev:c61b108, Date:24.05.2018 @ 16:54:44 CEST)
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: flink
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b10
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 981 MiBytes
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: /etc/alternatives/jre_openjdk/
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  No Hadoop Dependency available
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xms1024m
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xmx1024m
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog.file=/opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:/opt/flink-1.5.0/conf/log4j.properties
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:/opt/flink-1.5.0/conf/logback.xml
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Program Arguments:
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --configDir
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     /opt/flink-1.5.0/conf
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --executionMode
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --host
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Classpath: /opt/flink-1.5.0/lib/flink-cep_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-connectors-1.5.0.jar:/opt/flink-1.5.0/lib/flink-gelly_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-ml_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-table_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/log4j-1.2.17.jar:/opt/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,712 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 4124
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, myfl-flink-jobmanager
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 4123
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.rpc.port, 4122
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.data.port, 4121
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.query.port, 4125
2018-07-03 11:30:35,722 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.address, myfl-flink-jobmanager-ui
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, rocksdb
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, file:///opt/flink/share/myfl-flink/checkpoints/ext_checkpoints
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.fs.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/fs_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/rocksdb_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy, fixed-delay
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.attempts, 100
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.delay, 1 s
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Starting StandaloneSessionClusterEntrypoint.
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install default filesystem.
2018-07-03 11:30:35,892 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2018-07-03 11:30:35,963 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install security context.
2018-07-03 11:30:35,970 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,988 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,989 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Initializing cluster services.
2018-07-03 11:30:36,003 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Trying to start actor system at myfl-flink-jobmanager:4123
2018-07-03 11:30:37,288 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2018-07-03 11:30:37,396 INFO  akka.remote.Remoting                                          - Starting remoting
2018-07-03 11:30:37,583 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink@myfl-flink-jobmanager:4123]
2018-07-03 11:30:37,591 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at akka.tcp://flink@myfl-flink-jobmanager:4123
2018-07-03 11:30:37,611 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-e445bc66-cee3-4a3d-b810-74df02627eca
2018-07-03 11:30:37,613 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:4124 - max concurrent requests: 50 - max backlog: 1000
2018-07-03 11:30:37,629 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
2018-07-03 11:30:37,664 INFO  org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore  - Initializing FileArchivedExecutionGraphStore: Storage directory /tmp/executionGraphStore-4ff546b1-4bfb-4911-9314-89c61d7e7149, expiration time 3600000, maximum cache size 52428800 bytes.
2018-07-03 11:30:37,694 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /tmp/blobStore-7e0efdb8-f70b-42ed-9387-c0e1b8090b36
2018-07-03 11:30:37,702 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Upload directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available.
2018-07-03 11:30:37,703 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Created directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload for file uploads.
2018-07-03 11:30:37,706 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Starting rest endpoint.
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component log file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component stdout file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.out
2018-07-03 11:30:38,567 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at myfl-flink-jobmanager-ui:8081
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://myfl-flink-jobmanager-ui:8081 was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://myfl-flink-jobmanager-ui:8081.
2018-07-03 11:30:38,578 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2018-07-03 11:30:38,966 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
2018-07-03 11:30:39,068 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - ResourceManager akka.tcp://flink@myfl-flink-jobmanager:4123/user/resourcemanager was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,069 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting the SlotManager.
2018-07-03 11:30:39,164 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher akka.tcp://flink@myfl-flink-jobmanager:4123/user/dispatcher was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,165 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Recovering all persisted jobs.
2018-07-03 11:30:39,682 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Replacing old registration of TaskExecutor 068c693b9585900f68c53b00507ee889.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 8a5cee3aa38081030dc8558ac477d3b3 from the SlotManager.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - The target with resource ID 068c693b9585900f68c53b00507ee889 is already been monitored.
2018-07-03 11:30:39,770 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Registering TaskManager 068c693b9585900f68c53b00507ee889 under 03d409e5166fad4f4082b6165eb0de2e at the SlotManager.
2018-07-03 11:34:20,257 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job b684656d9afd75cc384a7bcd071bf55e (CSV Files Read -> CSV to Avro encode -> Kafka publish).
2018-07-03 11:34:20,269 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_0 .
2018-07-03 11:34:20,278 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Initializing job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,285 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=0) for CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,289 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/67ceb2ae-1cb1-44be-a09e-601032e23fb5 .
2018-07-03 11:34:20,481 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
2018-07-03 11:34:20,562 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Running initialization on master for job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,563 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Successfully ran initialization on master in 0 ms.
2018-07-03 11:34:20,580 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Loading state backend via factory org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
2018-07-03 11:34:20,590 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'state.backend.rocksdb.checkpointdir' instead of proper key 'state.backend.rocksdb.localdir'
2018-07-03 11:34:20,592 ERROR org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    - Implementation error: Unhandled exception.
org.apache.flink.util.FlinkException: Failed to submit job b684656d9afd75cc384a7bcd071bf55e.
	at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
	at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
	at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
	at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
	at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
	at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
	at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
	... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate configured state backend
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308)
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
	at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
	at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
	at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
	... 26 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Invalid configuration for RocksDB state backend's local storage directories: Relative paths are not supported
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:273)
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.configure(RocksDBStateBackend.java:296)
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:47)
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
	at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:157)
	at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222)
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
	... 31 more
Caused by: java.lang.IllegalArgumentException: Relative paths are not supported
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.setDbStoragePaths(RocksDBStateBackend.java:518)
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:269)
	... 37 more


On Tue, Jul 3, 2018 at 5:11 PM, Chesnay Schepler <[hidden email]> wrote:
Doesn't sound like intended behavior, can you give us the stacktrace?


On 03.07.2018 13:17, Data Engineer wrote:
 The Flink documentation says that we need to specify the filesystem type (file://, hdfs://) when configuring the rocksdb backend dir.
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html#the-rocksdbstatebackend

But when I do this, I get an error on job submission saying that relative paths are not permitted in the rocksdb stand backend.
I am submitting the job via flink cli (bin/flink run).

Also, even though I give a local file system path "file:///home/abc/share", it is a shared GlusterFS volume mount, so it will be accessible by the JobManager and all TaskManagers.

I removed the filesystem type from the rocksdb backend dir configuration, and though the job got submitted, the rocksdb checkpoint directory was not created.
I have enabled checkpointing in my Flink application.

I am using Flink 1.5.0.

Any help or pointers would be appreciated.



Reply | Threaded
Open this post in threaded view
|

Re: Checkpointing in Flink 1.5.0

Chesnay Schepler
Thanks. Looks like RocksDBStateBackend.setDbStoragePaths has some custom file path parsing logic, will probe it a bit to see what the issue is.

On 03.07.2018 13:45, Data Engineer wrote:
2018-07-03 11:30:35,703 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting StandaloneSessionClusterEntrypoint (Version: <unknown>, Rev:c61b108, Date:24.05.2018 @ 16:54:44 CEST)
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: flink
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b10
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 981 MiBytes
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: /etc/alternatives/jre_openjdk/
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  No Hadoop Dependency available
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xms1024m
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xmx1024m
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog.file=/opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:/opt/flink-1.5.0/conf/log4j.properties
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:/opt/flink-1.5.0/conf/logback.xml
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Program Arguments:
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --configDir
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     /opt/flink-1.5.0/conf
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --executionMode
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --host
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Classpath: /opt/flink-1.5.0/lib/flink-cep_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-connectors-1.5.0.jar:/opt/flink-1.5.0/lib/flink-gelly_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-ml_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-table_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/log4j-1.2.17.jar:/opt/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,712 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 4124
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, myfl-flink-jobmanager
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 4123
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.rpc.port, 4122
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.data.port, 4121
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.query.port, 4125
2018-07-03 11:30:35,722 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.address, myfl-flink-jobmanager-ui
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, rocksdb
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, file:///opt/flink/share/myfl-flink/checkpoints/ext_checkpoints
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.fs.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/fs_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/rocksdb_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy, fixed-delay
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.attempts, 100
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.delay, 1 s
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Starting StandaloneSessionClusterEntrypoint.
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install default filesystem.
2018-07-03 11:30:35,892 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2018-07-03 11:30:35,963 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install security context.
2018-07-03 11:30:35,970 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,988 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,989 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Initializing cluster services.
2018-07-03 11:30:36,003 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Trying to start actor system at myfl-flink-jobmanager:4123
2018-07-03 11:30:37,288 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2018-07-03 11:30:37,396 INFO  akka.remote.Remoting                                          - Starting remoting
2018-07-03 11:30:37,583 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink@myfl-flink-jobmanager:4123]
2018-07-03 11:30:37,591 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at akka.tcp://flink@myfl-flink-jobmanager:4123
2018-07-03 11:30:37,611 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-e445bc66-cee3-4a3d-b810-74df02627eca
2018-07-03 11:30:37,613 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:4124 - max concurrent requests: 50 - max backlog: 1000
2018-07-03 11:30:37,629 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
2018-07-03 11:30:37,664 INFO  org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore  - Initializing FileArchivedExecutionGraphStore: Storage directory /tmp/executionGraphStore-4ff546b1-4bfb-4911-9314-89c61d7e7149, expiration time 3600000, maximum cache size 52428800 bytes.
2018-07-03 11:30:37,694 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /tmp/blobStore-7e0efdb8-f70b-42ed-9387-c0e1b8090b36
2018-07-03 11:30:37,702 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Upload directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available.
2018-07-03 11:30:37,703 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Created directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload for file uploads.
2018-07-03 11:30:37,706 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Starting rest endpoint.
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component log file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component stdout file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.out
2018-07-03 11:30:38,567 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at myfl-flink-jobmanager-ui:8081
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://myfl-flink-jobmanager-ui:8081 was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://myfl-flink-jobmanager-ui:8081.
2018-07-03 11:30:38,578 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2018-07-03 11:30:38,966 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
2018-07-03 11:30:39,068 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - ResourceManager akka.tcp://flink@myfl-flink-jobmanager:4123/user/resourcemanager was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,069 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting the SlotManager.
2018-07-03 11:30:39,164 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher akka.tcp://flink@myfl-flink-jobmanager:4123/user/dispatcher was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,165 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Recovering all persisted jobs.
2018-07-03 11:30:39,682 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Replacing old registration of TaskExecutor 068c693b9585900f68c53b00507ee889.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 8a5cee3aa38081030dc8558ac477d3b3 from the SlotManager.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - The target with resource ID 068c693b9585900f68c53b00507ee889 is already been monitored.
2018-07-03 11:30:39,770 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Registering TaskManager 068c693b9585900f68c53b00507ee889 under 03d409e5166fad4f4082b6165eb0de2e at the SlotManager.
2018-07-03 11:34:20,257 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job b684656d9afd75cc384a7bcd071bf55e (CSV Files Read -> CSV to Avro encode -> Kafka publish).
2018-07-03 11:34:20,269 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_0 .
2018-07-03 11:34:20,278 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Initializing job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,285 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=0) for CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,289 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/67ceb2ae-1cb1-44be-a09e-601032e23fb5 .
2018-07-03 11:34:20,481 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
2018-07-03 11:34:20,562 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Running initialization on master for job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,563 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Successfully ran initialization on master in 0 ms.
2018-07-03 11:34:20,580 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Loading state backend via factory org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
2018-07-03 11:34:20,590 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'state.backend.rocksdb.checkpointdir' instead of proper key 'state.backend.rocksdb.localdir'
2018-07-03 11:34:20,592 ERROR org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    - Implementation error: Unhandled exception.
org.apache.flink.util.FlinkException: Failed to submit job b684656d9afd75cc384a7bcd071bf55e.
	at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
	at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
	at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
	at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
	at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
	at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
	at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
	... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate configured state backend
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308)
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
	at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
	at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
	at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
	... 26 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Invalid configuration for RocksDB state backend's local storage directories: Relative paths are not supported
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:273)
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.configure(RocksDBStateBackend.java:296)
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:47)
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
	at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:157)
	at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222)
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
	... 31 more
Caused by: java.lang.IllegalArgumentException: Relative paths are not supported
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.setDbStoragePaths(RocksDBStateBackend.java:518)
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:269)
	... 37 more


On Tue, Jul 3, 2018 at 5:11 PM, Chesnay Schepler <[hidden email]> wrote:
Doesn't sound like intended behavior, can you give us the stacktrace?


On 03.07.2018 13:17, Data Engineer wrote:
 The Flink documentation says that we need to specify the filesystem type (<a class="moz-txt-link-freetext" href="file://">file://, hdfs://) when configuring the rocksdb backend dir.
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html#the-rocksdbstatebackend

But when I do this, I get an error on job submission saying that relative paths are not permitted in the rocksdb stand backend.
I am submitting the job via flink cli (bin/flink run).

Also, even though I give a local file system path "file:///home/abc/share", it is a shared GlusterFS volume mount, so it will be accessible by the JobManager and all TaskManagers.

I removed the filesystem type from the rocksdb backend dir configuration, and though the job got submitted, the rocksdb checkpoint directory was not created.
I have enabled checkpointing in my Flink application.

I am using Flink 1.5.0.

Any help or pointers would be appreciated.




Reply | Threaded
Open this post in threaded view
|

Re: Checkpointing in Flink 1.5.0

Chesnay Schepler
The code appears to be working fine.

This may happen because you're using a GlusterFS volume.
The RocksDBStateBackend uses java Files internally (NOT nio Paths), which AFAIK only work properly against the plain local file-system.

The GlusterFS nio FIleSystem implementation also explicitly does not support conversions to File.

On 03.07.2018 13:53, Chesnay Schepler wrote:
Thanks. Looks like RocksDBStateBackend.setDbStoragePaths has some custom file path parsing logic, will probe it a bit to see what the issue is.

On 03.07.2018 13:45, Data Engineer wrote:
2018-07-03 11:30:35,703 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting StandaloneSessionClusterEntrypoint (Version: <unknown>, Rev:c61b108, Date:24.05.2018 @ 16:54:44 CEST)
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: flink
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b10
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 981 MiBytes
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: /etc/alternatives/jre_openjdk/
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  No Hadoop Dependency available
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xms1024m
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xmx1024m
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog.file=/opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:/opt/flink-1.5.0/conf/log4j.properties
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:/opt/flink-1.5.0/conf/logback.xml
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Program Arguments:
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --configDir
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     /opt/flink-1.5.0/conf
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --executionMode
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --host
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Classpath: /opt/flink-1.5.0/lib/flink-cep_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-connectors-1.5.0.jar:/opt/flink-1.5.0/lib/flink-gelly_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-ml_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-table_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/log4j-1.2.17.jar:/opt/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,712 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 4124
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, myfl-flink-jobmanager
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 4123
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.rpc.port, 4122
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.data.port, 4121
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.query.port, 4125
2018-07-03 11:30:35,722 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.address, myfl-flink-jobmanager-ui
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, rocksdb
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, file:///opt/flink/share/myfl-flink/checkpoints/ext_checkpoints
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.fs.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/fs_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/rocksdb_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy, fixed-delay
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.attempts, 100
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.delay, 1 s
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Starting StandaloneSessionClusterEntrypoint.
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install default filesystem.
2018-07-03 11:30:35,892 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2018-07-03 11:30:35,963 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install security context.
2018-07-03 11:30:35,970 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,988 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,989 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Initializing cluster services.
2018-07-03 11:30:36,003 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Trying to start actor system at myfl-flink-jobmanager:4123
2018-07-03 11:30:37,288 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2018-07-03 11:30:37,396 INFO  akka.remote.Remoting                                          - Starting remoting
2018-07-03 11:30:37,583 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink@myfl-flink-jobmanager:4123]
2018-07-03 11:30:37,591 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at akka.tcp://flink@myfl-flink-jobmanager:4123
2018-07-03 11:30:37,611 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-e445bc66-cee3-4a3d-b810-74df02627eca
2018-07-03 11:30:37,613 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:4124 - max concurrent requests: 50 - max backlog: 1000
2018-07-03 11:30:37,629 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
2018-07-03 11:30:37,664 INFO  org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore  - Initializing FileArchivedExecutionGraphStore: Storage directory /tmp/executionGraphStore-4ff546b1-4bfb-4911-9314-89c61d7e7149, expiration time 3600000, maximum cache size 52428800 bytes.
2018-07-03 11:30:37,694 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /tmp/blobStore-7e0efdb8-f70b-42ed-9387-c0e1b8090b36
2018-07-03 11:30:37,702 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Upload directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available.
2018-07-03 11:30:37,703 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Created directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload for file uploads.
2018-07-03 11:30:37,706 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Starting rest endpoint.
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component log file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component stdout file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.out
2018-07-03 11:30:38,567 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at myfl-flink-jobmanager-ui:8081
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://myfl-flink-jobmanager-ui:8081 was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://myfl-flink-jobmanager-ui:8081.
2018-07-03 11:30:38,578 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2018-07-03 11:30:38,966 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
2018-07-03 11:30:39,068 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - ResourceManager akka.tcp://flink@myfl-flink-jobmanager:4123/user/resourcemanager was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,069 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting the SlotManager.
2018-07-03 11:30:39,164 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher akka.tcp://flink@myfl-flink-jobmanager:4123/user/dispatcher was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,165 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Recovering all persisted jobs.
2018-07-03 11:30:39,682 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Replacing old registration of TaskExecutor 068c693b9585900f68c53b00507ee889.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 8a5cee3aa38081030dc8558ac477d3b3 from the SlotManager.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - The target with resource ID 068c693b9585900f68c53b00507ee889 is already been monitored.
2018-07-03 11:30:39,770 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Registering TaskManager 068c693b9585900f68c53b00507ee889 under 03d409e5166fad4f4082b6165eb0de2e at the SlotManager.
2018-07-03 11:34:20,257 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job b684656d9afd75cc384a7bcd071bf55e (CSV Files Read -> CSV to Avro encode -> Kafka publish).
2018-07-03 11:34:20,269 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_0 .
2018-07-03 11:34:20,278 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Initializing job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,285 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=0) for CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,289 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/67ceb2ae-1cb1-44be-a09e-601032e23fb5 .
2018-07-03 11:34:20,481 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
2018-07-03 11:34:20,562 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Running initialization on master for job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,563 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Successfully ran initialization on master in 0 ms.
2018-07-03 11:34:20,580 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Loading state backend via factory org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
2018-07-03 11:34:20,590 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'state.backend.rocksdb.checkpointdir' instead of proper key 'state.backend.rocksdb.localdir'
2018-07-03 11:34:20,592 ERROR org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    - Implementation error: Unhandled exception.
org.apache.flink.util.FlinkException: Failed to submit job b684656d9afd75cc384a7bcd071bf55e.
	at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
	at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
	at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
	at akka.actor.ActorCell.invoke(ActorCell.scala:495)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
	at akka.dispatch.Mailbox.run(Mailbox.scala:224)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
	at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
	at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
	at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
	at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
	at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
	at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
	... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate configured state backend
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308)
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
	at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
	at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
	at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
	at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
	... 26 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Invalid configuration for RocksDB state backend's local storage directories: Relative paths are not supported
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:273)
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.configure(RocksDBStateBackend.java:296)
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:47)
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
	at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:157)
	at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222)
	at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
	... 31 more
Caused by: java.lang.IllegalArgumentException: Relative paths are not supported
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.setDbStoragePaths(RocksDBStateBackend.java:518)
	at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:269)
	... 37 more


On Tue, Jul 3, 2018 at 5:11 PM, Chesnay Schepler <[hidden email]> wrote:
Doesn't sound like intended behavior, can you give us the stacktrace?


On 03.07.2018 13:17, Data Engineer wrote:
 The Flink documentation says that we need to specify the filesystem type (<a class="moz-txt-link-freetext" href="file://" moz-do-not-send="true">file://, hdfs://) when configuring the rocksdb backend dir.
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html#the-rocksdbstatebackend

But when I do this, I get an error on job submission saying that relative paths are not permitted in the rocksdb stand backend.
I am submitting the job via flink cli (bin/flink run).

Also, even though I give a local file system path "file:///home/abc/share", it is a shared GlusterFS volume mount, so it will be accessible by the JobManager and all TaskManagers.

I removed the filesystem type from the rocksdb backend dir configuration, and though the job got submitted, the rocksdb checkpoint directory was not created.
I have enabled checkpointing in my Flink application.

I am using Flink 1.5.0.

Any help or pointers would be appreciated.





Reply | Threaded
Open this post in threaded view
|

RE: Checkpointing in Flink 1.5.0

Jash, Shaswata (Nokia - IN/Bangalore)

Hello Chesnay,

 

Cluster (in kubernetes)-wide checkpointing directory using glusterfs volume mount (thus file access protocol file:///) was working fine till 1.4.2 for us. So we like to understand where the breakage happened in 1.5.0.

Can you please mention me the relevant source code files related to rocksdb “custom file path” parsing logic? We would be interested to investigate this.

 

I also observed below in the log –

 

Config uses deprecated configuration key 'state.backend.rocksdb.checkpointdir' instead of proper key 'state.backend.rocksdb.localdir'

Regards,

Shaswata

 

From: Chesnay Schepler [mailto:[hidden email]]
Sent: Tuesday, July 03, 2018 5:52 PM
To: Data Engineer <[hidden email]>
Cc: [hidden email]
Subject: Re: Checkpointing in Flink 1.5.0

 

The code appears to be working fine.

This may happen because you're using a GlusterFS volume.
The RocksDBStateBackend uses java Files internally (NOT nio Paths), which AFAIK only work properly against the plain local file-system.

The GlusterFS nio FIleSystem implementation also explicitly does not support conversions to File.

On 03.07.2018 13:53, Chesnay Schepler wrote:

Thanks. Looks like RocksDBStateBackend.setDbStoragePaths has some custom file path parsing logic, will probe it a bit to see what the issue is.

On 03.07.2018 13:45, Data Engineer wrote:

2018-07-03 11:30:35,703 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting StandaloneSessionClusterEntrypoint (Version: <unknown>, Rev:c61b108, Date:24.05.2018 @ 16:54:44 CEST)
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: flink
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b10
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 981 MiBytes
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: /etc/alternatives/jre_openjdk/
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  No Hadoop Dependency available
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xms1024m
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xmx1024m
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog.file=/opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:/opt/flink-1.5.0/conf/log4j.properties
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:/opt/flink-1.5.0/conf/logback.xml
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Program Arguments:
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --configDir
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     /opt/flink-1.5.0/conf
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --executionMode
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --host
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Classpath: /opt/flink-1.5.0/lib/flink-cep_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-connectors-1.5.0.jar:/opt/flink-1.5.0/lib/flink-gelly_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-ml_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-table_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/log4j-1.2.17.jar:/opt/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,712 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 4124
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, myfl-flink-jobmanager
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 4123
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.rpc.port, 4122
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.data.port, 4121
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.query.port, 4125
2018-07-03 11:30:35,722 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.address, myfl-flink-jobmanager-ui
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, rocksdb
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, file:///opt/flink/share/myfl-flink/checkpoints/ext_checkpoints
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.fs.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/fs_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/rocksdb_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy, fixed-delay
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.attempts, 100
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.delay, 1 s
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Starting StandaloneSessionClusterEntrypoint.
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install default filesystem.
2018-07-03 11:30:35,892 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2018-07-03 11:30:35,963 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install security context.
2018-07-03 11:30:35,970 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,988 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,989 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Initializing cluster services.
2018-07-03 11:30:36,003 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Trying to start actor system at myfl-flink-jobmanager:4123
2018-07-03 11:30:37,288 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2018-07-03 11:30:37,396 INFO  akka.remote.Remoting                                          - Starting remoting
2018-07-03 11:30:37,583 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink@myfl-flink-jobmanager:4123]
2018-07-03 11:30:37,591 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at akka.tcp://flink@myfl-flink-jobmanager:4123
2018-07-03 11:30:37,611 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-e445bc66-cee3-4a3d-b810-74df02627eca
2018-07-03 11:30:37,613 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:4124 - max concurrent requests: 50 - max backlog: 1000
2018-07-03 11:30:37,629 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
2018-07-03 11:30:37,664 INFO  org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore  - Initializing FileArchivedExecutionGraphStore: Storage directory /tmp/executionGraphStore-4ff546b1-4bfb-4911-9314-89c61d7e7149, expiration time 3600000, maximum cache size 52428800 bytes.
2018-07-03 11:30:37,694 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /tmp/blobStore-7e0efdb8-f70b-42ed-9387-c0e1b8090b36
2018-07-03 11:30:37,702 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Upload directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available.
2018-07-03 11:30:37,703 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Created directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload for file uploads.
2018-07-03 11:30:37,706 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Starting rest endpoint.
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component log file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component stdout file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.out
2018-07-03 11:30:38,567 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at myfl-flink-jobmanager-ui:8081
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://myfl-flink-jobmanager-ui:8081 was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://myfl-flink-jobmanager-ui:8081.
2018-07-03 11:30:38,578 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2018-07-03 11:30:38,966 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
2018-07-03 11:30:39,068 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - ResourceManager akka.tcp://flink@myfl-flink-jobmanager:4123/user/resourcemanager was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,069 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting the SlotManager.
2018-07-03 11:30:39,164 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher akka.tcp://flink@myfl-flink-jobmanager:4123/user/dispatcher was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,165 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Recovering all persisted jobs.
2018-07-03 11:30:39,682 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Replacing old registration of TaskExecutor 068c693b9585900f68c53b00507ee889.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 8a5cee3aa38081030dc8558ac477d3b3 from the SlotManager.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - The target with resource ID 068c693b9585900f68c53b00507ee889 is already been monitored.
2018-07-03 11:30:39,770 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Registering TaskManager 068c693b9585900f68c53b00507ee889 under 03d409e5166fad4f4082b6165eb0de2e at the SlotManager.
2018-07-03 11:34:20,257 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job b684656d9afd75cc384a7bcd071bf55e (CSV Files Read -> CSV to Avro encode -> Kafka publish).
2018-07-03 11:34:20,269 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_0 .
2018-07-03 11:34:20,278 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Initializing job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,285 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=0) for CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,289 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/67ceb2ae-1cb1-44be-a09e-601032e23fb5 .
2018-07-03 11:34:20,481 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
2018-07-03 11:34:20,562 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Running initialization on master for job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,563 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Successfully ran initialization on master in 0 ms.
2018-07-03 11:34:20,580 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Loading state backend via factory org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
2018-07-03 11:34:20,590 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'state.backend.rocksdb.checkpointdir' instead of proper key 'state.backend.rocksdb.localdir'
2018-07-03 11:34:20,592 ERROR org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    - Implementation error: Unhandled exception.
org.apache.flink.util.FlinkException: Failed to submit job b684656d9afd75cc384a7bcd071bf55e.
  at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
  at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
  at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
  at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
  at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
  at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
  at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
  at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
  at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
  at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
  ... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate configured state backend
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308)
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
  at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
  at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
  at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
  at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
  ... 26 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Invalid configuration for RocksDB state backend's local storage directories: Relative paths are not supported
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:273)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.configure(RocksDBStateBackend.java:296)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:47)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
  at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:157)
  at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222)
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
  ... 31 more
Caused by: java.lang.IllegalArgumentException: Relative paths are not supported
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.setDbStoragePaths(RocksDBStateBackend.java:518)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:269)
  ... 37 more

 

 

On Tue, Jul 3, 2018 at 5:11 PM, Chesnay Schepler <[hidden email]> wrote:

Doesn't sound like intended behavior, can you give us the stacktrace?



On 03.07.2018 13:17, Data Engineer wrote:

 The Flink documentation says that we need to specify the filesystem type (file://, hdfs://) when configuring the rocksdb backend dir.
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html#the-rocksdbstatebackend

But when I do this, I get an error on job submission saying that relative paths are not permitted in the rocksdb stand backend.
I am submitting the job via flink cli (bin/flink run).

Also, even though I give a local file system path "file:///home/abc/share", it is a shared GlusterFS volume mount, so it will be accessible by the JobManager and all TaskManagers.

I removed the filesystem type from the rocksdb backend dir configuration, and though the job got submitted, the rocksdb checkpoint directory was not created.
I have enabled checkpointing in my Flink application.

I am using Flink 1.5.0.

Any help or pointers would be appreciated.

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Checkpointing in Flink 1.5.0

Chesnay Schepler
It's not really path-parsing logic, but path handling i suppose; see RocksDBStateBackend#setDbStoragePaths().

I went ahead and converted said method into a simple test method, maybe this is enough to debug the issue.

I assume this regression was caused by FLINK-6557, which refactored the state backend to rely on java Files instead of Flink paths.
I'll open a JIRA to document it.

The deprecation notice is not a problem.

public static void testPaths(String... paths) {
   if (paths.length == 0) {
      throw new IllegalArgumentException("empty paths");
   }
   else {
      File[] pp = new File[paths.length];

      for (int i = 0; i < paths.length; i++) {
         final String rawPath = paths[i];
         final String path;

         if (rawPath == null) {
            throw new IllegalArgumentException("null path");
         }
         else {
            // we need this for backwards compatibility, to allow URIs like 'file:///'...
            URI uri = null;
            try {
               uri = new Path(rawPath).toUri();
            }
            catch (Exception e) {
               // cannot parse as a path
            }

            if (uri != null && uri.getScheme() != null) {
               if ("file".equalsIgnoreCase(uri.getScheme())) {
                  path = uri.getPath();
               }
               else {
                  throw new IllegalArgumentException("Path " + rawPath + " has a non-local scheme");
               }
            }
            else {
               path = rawPath;
            }
         }

         pp[i] = new File(path);
         if (!pp[i].isAbsolute()) { // my suspicion is that this categorically fails for GlusterFS paths
            throw new IllegalArgumentException("Relative paths are not supported");
         }
      }
   }
}


On 03.07.2018 16:35, Jash, Shaswata (Nokia - IN/Bangalore) wrote:

Hello Chesnay,

 

Cluster (in kubernetes)-wide checkpointing directory using glusterfs volume mount (thus file access protocol file:///) was working fine till 1.4.2 for us. So we like to understand where the breakage happened in 1.5.0.

Can you please mention me the relevant source code files related to rocksdb “custom file path” parsing logic? We would be interested to investigate this.

 

I also observed below in the log –

 

Config uses deprecated configuration key 'state.backend.rocksdb.checkpointdir' instead of proper key 'state.backend.rocksdb.localdir'

Regards,

Shaswata

 

From: Chesnay Schepler [[hidden email]]
Sent: Tuesday, July 03, 2018 5:52 PM
To: Data Engineer [hidden email]
Cc: [hidden email]
Subject: Re: Checkpointing in Flink 1.5.0

 

The code appears to be working fine.

This may happen because you're using a GlusterFS volume.
The RocksDBStateBackend uses java Files internally (NOT nio Paths), which AFAIK only work properly against the plain local file-system.

The GlusterFS nio FIleSystem implementation also explicitly does not support conversions to File.

On 03.07.2018 13:53, Chesnay Schepler wrote:

Thanks. Looks like RocksDBStateBackend.setDbStoragePaths has some custom file path parsing logic, will probe it a bit to see what the issue is.

On 03.07.2018 13:45, Data Engineer wrote:

2018-07-03 11:30:35,703 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting StandaloneSessionClusterEntrypoint (Version: <unknown>, Rev:c61b108, Date:24.05.2018 @ 16:54:44 CEST)
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: flink
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b10
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 981 MiBytes
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: /etc/alternatives/jre_openjdk/
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  No Hadoop Dependency available
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xms1024m
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xmx1024m
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog.file=/opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:/opt/flink-1.5.0/conf/log4j.properties
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:/opt/flink-1.5.0/conf/logback.xml
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Program Arguments:
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --configDir
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     /opt/flink-1.5.0/conf
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --executionMode
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --host
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Classpath: /opt/flink-1.5.0/lib/flink-cep_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-connectors-1.5.0.jar:/opt/flink-1.5.0/lib/flink-gelly_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-ml_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-table_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/log4j-1.2.17.jar:/opt/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,712 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 4124
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, myfl-flink-jobmanager
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 4123
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.rpc.port, 4122
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.data.port, 4121
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.query.port, 4125
2018-07-03 11:30:35,722 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.address, myfl-flink-jobmanager-ui
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, rocksdb
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, file:///opt/flink/share/myfl-flink/checkpoints/ext_checkpoints
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.fs.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/fs_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/rocksdb_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy, fixed-delay
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.attempts, 100
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.delay, 1 s
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Starting StandaloneSessionClusterEntrypoint.
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install default filesystem.
2018-07-03 11:30:35,892 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2018-07-03 11:30:35,963 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install security context.
2018-07-03 11:30:35,970 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,988 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,989 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Initializing cluster services.
2018-07-03 11:30:36,003 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Trying to start actor system at myfl-flink-jobmanager:4123
2018-07-03 11:30:37,288 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2018-07-03 11:30:37,396 INFO  akka.remote.Remoting                                          - Starting remoting
2018-07-03 11:30:37,583 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink@myfl-flink-jobmanager:4123]
2018-07-03 11:30:37,591 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at akka.tcp://flink@myfl-flink-jobmanager:4123
2018-07-03 11:30:37,611 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-e445bc66-cee3-4a3d-b810-74df02627eca
2018-07-03 11:30:37,613 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:4124 - max concurrent requests: 50 - max backlog: 1000
2018-07-03 11:30:37,629 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
2018-07-03 11:30:37,664 INFO  org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore  - Initializing FileArchivedExecutionGraphStore: Storage directory /tmp/executionGraphStore-4ff546b1-4bfb-4911-9314-89c61d7e7149, expiration time 3600000, maximum cache size 52428800 bytes.
2018-07-03 11:30:37,694 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /tmp/blobStore-7e0efdb8-f70b-42ed-9387-c0e1b8090b36
2018-07-03 11:30:37,702 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Upload directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available.
2018-07-03 11:30:37,703 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Created directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload for file uploads.
2018-07-03 11:30:37,706 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Starting rest endpoint.
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component log file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component stdout file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.out
2018-07-03 11:30:38,567 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at myfl-flink-jobmanager-ui:8081
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://myfl-flink-jobmanager-ui:8081 was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://myfl-flink-jobmanager-ui:8081.
2018-07-03 11:30:38,578 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2018-07-03 11:30:38,966 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
2018-07-03 11:30:39,068 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - ResourceManager akka.tcp://flink@myfl-flink-jobmanager:4123/user/resourcemanager was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,069 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting the SlotManager.
2018-07-03 11:30:39,164 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher akka.tcp://flink@myfl-flink-jobmanager:4123/user/dispatcher was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,165 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Recovering all persisted jobs.
2018-07-03 11:30:39,682 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Replacing old registration of TaskExecutor 068c693b9585900f68c53b00507ee889.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 8a5cee3aa38081030dc8558ac477d3b3 from the SlotManager.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - The target with resource ID 068c693b9585900f68c53b00507ee889 is already been monitored.
2018-07-03 11:30:39,770 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Registering TaskManager 068c693b9585900f68c53b00507ee889 under 03d409e5166fad4f4082b6165eb0de2e at the SlotManager.
2018-07-03 11:34:20,257 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job b684656d9afd75cc384a7bcd071bf55e (CSV Files Read -> CSV to Avro encode -> Kafka publish).
2018-07-03 11:34:20,269 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_0 .
2018-07-03 11:34:20,278 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Initializing job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,285 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=0) for CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,289 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/67ceb2ae-1cb1-44be-a09e-601032e23fb5 .
2018-07-03 11:34:20,481 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
2018-07-03 11:34:20,562 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Running initialization on master for job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,563 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Successfully ran initialization on master in 0 ms.
2018-07-03 11:34:20,580 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Loading state backend via factory org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
2018-07-03 11:34:20,590 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'state.backend.rocksdb.checkpointdir' instead of proper key 'state.backend.rocksdb.localdir'
2018-07-03 11:34:20,592 ERROR org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    - Implementation error: Unhandled exception.
org.apache.flink.util.FlinkException: Failed to submit job b684656d9afd75cc384a7bcd071bf55e.
  at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
  at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
  at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
  at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
  at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
  at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
  at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
  at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
  at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
  at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
  ... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate configured state backend
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308)
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
  at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
  at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
  at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
  at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
  ... 26 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Invalid configuration for RocksDB state backend's local storage directories: Relative paths are not supported
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:273)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.configure(RocksDBStateBackend.java:296)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:47)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
  at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:157)
  at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222)
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
  ... 31 more
Caused by: java.lang.IllegalArgumentException: Relative paths are not supported
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.setDbStoragePaths(RocksDBStateBackend.java:518)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:269)
  ... 37 more

 

 

On Tue, Jul 3, 2018 at 5:11 PM, Chesnay Schepler <[hidden email]> wrote:

Doesn't sound like intended behavior, can you give us the stacktrace?



On 03.07.2018 13:17, Data Engineer wrote:

 The Flink documentation says that we need to specify the filesystem type (file://, hdfs://) when configuring the rocksdb backend dir.
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html#the-rocksdbstatebackend

But when I do this, I get an error on job submission saying that relative paths are not permitted in the rocksdb stand backend.
I am submitting the job via flink cli (bin/flink run).

Also, even though I give a local file system path "file:///home/abc/share", it is a shared GlusterFS volume mount, so it will be accessible by the JobManager and all TaskManagers.

I removed the filesystem type from the rocksdb backend dir configuration, and though the job got submitted, the rocksdb checkpoint directory was not created.
I have enabled checkpointing in my Flink application.

I am using Flink 1.5.0.

Any help or pointers would be appreciated.

 

 

 

 


Reply | Threaded
Open this post in threaded view
|

Re: Checkpointing in Flink 1.5.0

Chesnay Schepler
Reference: https://issues.apache.org/jira/browse/FLINK-9739

On 04.07.2018 10:46, Chesnay Schepler wrote:
It's not really path-parsing logic, but path handling i suppose; see RocksDBStateBackend#setDbStoragePaths().

I went ahead and converted said method into a simple test method, maybe this is enough to debug the issue.

I assume this regression was caused by FLINK-6557, which refactored the state backend to rely on java Files instead of Flink paths.
I'll open a JIRA to document it.

The deprecation notice is not a problem.

public static void testPaths(String... paths) {
   if (paths.length == 0) {
      throw new IllegalArgumentException("empty paths");
   }
   else {
      File[] pp = new File[paths.length];

      for (int i = 0; i < paths.length; i++) {
         final String rawPath = paths[i];
         final String path;

         if (rawPath == null) {
            throw new IllegalArgumentException("null path");
         }
         else {
            // we need this for backwards compatibility, to allow URIs like 'file:///'...
            URI uri = null;
            try {
               uri = new Path(rawPath).toUri();
            }
            catch (Exception e) {
               // cannot parse as a path
            }

            if (uri != null && uri.getScheme() != null) {
               if ("file".equalsIgnoreCase(uri.getScheme())) {
                  path = uri.getPath();
               }
               else {
                  throw new IllegalArgumentException("Path " + rawPath + " has a non-local scheme");
               }
            }
            else {
               path = rawPath;
            }
         }

         pp[i] = new File(path);
         if (!pp[i].isAbsolute()) { // my suspicion is that this categorically fails for GlusterFS paths
            throw new IllegalArgumentException("Relative paths are not supported");
         }
      }
   }
}


On 03.07.2018 16:35, Jash, Shaswata (Nokia - IN/Bangalore) wrote:

Hello Chesnay,

 

Cluster (in kubernetes)-wide checkpointing directory using glusterfs volume mount (thus file access protocol file:///) was working fine till 1.4.2 for us. So we like to understand where the breakage happened in 1.5.0.

Can you please mention me the relevant source code files related to rocksdb “custom file path” parsing logic? We would be interested to investigate this.

 

I also observed below in the log –

 

Config uses deprecated configuration key 'state.backend.rocksdb.checkpointdir' instead of proper key 'state.backend.rocksdb.localdir'

Regards,

Shaswata

 

From: Chesnay Schepler [[hidden email]]
Sent: Tuesday, July 03, 2018 5:52 PM
To: Data Engineer [hidden email]
Cc: [hidden email]
Subject: Re: Checkpointing in Flink 1.5.0

 

The code appears to be working fine.

This may happen because you're using a GlusterFS volume.
The RocksDBStateBackend uses java Files internally (NOT nio Paths), which AFAIK only work properly against the plain local file-system.

The GlusterFS nio FIleSystem implementation also explicitly does not support conversions to File.

On 03.07.2018 13:53, Chesnay Schepler wrote:

Thanks. Looks like RocksDBStateBackend.setDbStoragePaths has some custom file path parsing logic, will probe it a bit to see what the issue is.

On 03.07.2018 13:45, Data Engineer wrote:

2018-07-03 11:30:35,703 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting StandaloneSessionClusterEntrypoint (Version: <unknown>, Rev:c61b108, Date:24.05.2018 @ 16:54:44 CEST)
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: flink
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b10
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 981 MiBytes
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: /etc/alternatives/jre_openjdk/
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  No Hadoop Dependency available
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xms1024m
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xmx1024m
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog.file=/opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:/opt/flink-1.5.0/conf/log4j.properties
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:/opt/flink-1.5.0/conf/logback.xml
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Program Arguments:
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --configDir
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     /opt/flink-1.5.0/conf
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --executionMode
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --host
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Classpath: /opt/flink-1.5.0/lib/flink-cep_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-connectors-1.5.0.jar:/opt/flink-1.5.0/lib/flink-gelly_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-ml_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-table_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/log4j-1.2.17.jar:/opt/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,712 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 4124
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, myfl-flink-jobmanager
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 4123
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.rpc.port, 4122
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.data.port, 4121
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.query.port, 4125
2018-07-03 11:30:35,722 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.address, myfl-flink-jobmanager-ui
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, rocksdb
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, file:///opt/flink/share/myfl-flink/checkpoints/ext_checkpoints
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.fs.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/fs_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/rocksdb_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy, fixed-delay
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.attempts, 100
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.delay, 1 s
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Starting StandaloneSessionClusterEntrypoint.
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install default filesystem.
2018-07-03 11:30:35,892 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2018-07-03 11:30:35,963 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install security context.
2018-07-03 11:30:35,970 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,988 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,989 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Initializing cluster services.
2018-07-03 11:30:36,003 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Trying to start actor system at myfl-flink-jobmanager:4123
2018-07-03 11:30:37,288 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2018-07-03 11:30:37,396 INFO  akka.remote.Remoting                                          - Starting remoting
2018-07-03 11:30:37,583 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink@myfl-flink-jobmanager:4123]
2018-07-03 11:30:37,591 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at akka.tcp://flink@myfl-flink-jobmanager:4123
2018-07-03 11:30:37,611 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-e445bc66-cee3-4a3d-b810-74df02627eca
2018-07-03 11:30:37,613 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:4124 - max concurrent requests: 50 - max backlog: 1000
2018-07-03 11:30:37,629 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
2018-07-03 11:30:37,664 INFO  org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore  - Initializing FileArchivedExecutionGraphStore: Storage directory /tmp/executionGraphStore-4ff546b1-4bfb-4911-9314-89c61d7e7149, expiration time 3600000, maximum cache size 52428800 bytes.
2018-07-03 11:30:37,694 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /tmp/blobStore-7e0efdb8-f70b-42ed-9387-c0e1b8090b36
2018-07-03 11:30:37,702 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Upload directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available.
2018-07-03 11:30:37,703 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Created directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload for file uploads.
2018-07-03 11:30:37,706 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Starting rest endpoint.
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component log file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component stdout file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.out
2018-07-03 11:30:38,567 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at myfl-flink-jobmanager-ui:8081
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://myfl-flink-jobmanager-ui:8081 was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://myfl-flink-jobmanager-ui:8081.
2018-07-03 11:30:38,578 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2018-07-03 11:30:38,966 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
2018-07-03 11:30:39,068 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - ResourceManager akka.tcp://flink@myfl-flink-jobmanager:4123/user/resourcemanager was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,069 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting the SlotManager.
2018-07-03 11:30:39,164 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher akka.tcp://flink@myfl-flink-jobmanager:4123/user/dispatcher was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,165 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Recovering all persisted jobs.
2018-07-03 11:30:39,682 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Replacing old registration of TaskExecutor 068c693b9585900f68c53b00507ee889.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 8a5cee3aa38081030dc8558ac477d3b3 from the SlotManager.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - The target with resource ID 068c693b9585900f68c53b00507ee889 is already been monitored.
2018-07-03 11:30:39,770 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Registering TaskManager 068c693b9585900f68c53b00507ee889 under 03d409e5166fad4f4082b6165eb0de2e at the SlotManager.
2018-07-03 11:34:20,257 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job b684656d9afd75cc384a7bcd071bf55e (CSV Files Read -> CSV to Avro encode -> Kafka publish).
2018-07-03 11:34:20,269 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_0 .
2018-07-03 11:34:20,278 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Initializing job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,285 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=0) for CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,289 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/67ceb2ae-1cb1-44be-a09e-601032e23fb5 .
2018-07-03 11:34:20,481 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
2018-07-03 11:34:20,562 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Running initialization on master for job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,563 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Successfully ran initialization on master in 0 ms.
2018-07-03 11:34:20,580 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Loading state backend via factory org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
2018-07-03 11:34:20,590 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'state.backend.rocksdb.checkpointdir' instead of proper key 'state.backend.rocksdb.localdir'
2018-07-03 11:34:20,592 ERROR org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    - Implementation error: Unhandled exception.
org.apache.flink.util.FlinkException: Failed to submit job b684656d9afd75cc384a7bcd071bf55e.
  at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
  at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
  at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
  at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
  at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
  at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
  at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
  at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
  at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
  at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
  ... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate configured state backend
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308)
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
  at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
  at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
  at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
  at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
  ... 26 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Invalid configuration for RocksDB state backend's local storage directories: Relative paths are not supported
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:273)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.configure(RocksDBStateBackend.java:296)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:47)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
  at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:157)
  at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222)
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
  ... 31 more
Caused by: java.lang.IllegalArgumentException: Relative paths are not supported
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.setDbStoragePaths(RocksDBStateBackend.java:518)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:269)
  ... 37 more

 

 

On Tue, Jul 3, 2018 at 5:11 PM, Chesnay Schepler <[hidden email]> wrote:

Doesn't sound like intended behavior, can you give us the stacktrace?



On 03.07.2018 13:17, Data Engineer wrote:

 The Flink documentation says that we need to specify the filesystem type (file://, hdfs://) when configuring the rocksdb backend dir.
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html#the-rocksdbstatebackend

But when I do this, I get an error on job submission saying that relative paths are not permitted in the rocksdb stand backend.
I am submitting the job via flink cli (bin/flink run).

Also, even though I give a local file system path "file:///home/abc/share", it is a shared GlusterFS volume mount, so it will be accessible by the JobManager and all TaskManagers.

I removed the filesystem type from the rocksdb backend dir configuration, and though the job got submitted, the rocksdb checkpoint directory was not created.
I have enabled checkpointing in my Flink application.

I am using Flink 1.5.0.

Any help or pointers would be appreciated.

 

 

 

 



Reply | Threaded
Open this post in threaded view
|

Re: Checkpointing in Flink 1.5.0

Sampath Bhat
Chesnay - Why is the absolute file check required in the RocksDBStateBackend.setDbStoragePaths(String ... paths). I think this is causing the issue. Its not related to GlusterFS or file system. The same problem can be reproduced with the following configuration on local machine. The flink application should support checkpointing. We get the IllegealArgumentexecption (Relative File paths not allowed)

state.backend: rocksdb
state.checkpoints.dir: file:///home/demo/checkpoints/ext_checkpoints
state.savepoints.dir: file:///home/demo/checkpoints/checkpoints/savepoints
state.backend.fs.checkpointdir: file:///home/demo/checkpoints/checkpoints/fs_state
#state.backend.rocksdb.checkpointdir: file:///home/demo/checkpoints/checkpoints/rocksdb_state
state.backend.rocksdb.localdir: /home/demo/checkpoints/checkpoints/rocksdb_state

Any insights would be helpful.

On Wed, Jul 4, 2018 at 2:27 PM, Chesnay Schepler <[hidden email]> wrote:
Reference: https://issues.apache.org/jira/browse/FLINK-9739


On 04.07.2018 10:46, Chesnay Schepler wrote:
It's not really path-parsing logic, but path handling i suppose; see RocksDBStateBackend#setDbStoragePaths().

I went ahead and converted said method into a simple test method, maybe this is enough to debug the issue.

I assume this regression was caused by FLINK-6557, which refactored the state backend to rely on java Files instead of Flink paths.
I'll open a JIRA to document it.

The deprecation notice is not a problem.

public static void testPaths(String... paths) {
   if (paths.length == 0) {
      throw new IllegalArgumentException("empty paths");
   }
   else {
      File[] pp = new File[paths.length];

      for (int i = 0; i < paths.length; i++) {
         final String rawPath = paths[i];
         final String path;

         if (rawPath == null) {
            throw new IllegalArgumentException("null path");
         }
         else {
            // we need this for backwards compatibility, to allow URIs like 'file:///'...
            URI uri = null;
            try {
               uri = new Path(rawPath).toUri();
            }
            catch (Exception e) {
               // cannot parse as a path
            }

            if (uri != null && uri.getScheme() != null) {
               if ("file".equalsIgnoreCase(uri.getScheme())) {
                  path = uri.getPath();
               }
               else {
                  throw new IllegalArgumentException("Path " + rawPath + " has a non-local scheme");
               }
            }
            else {
               path = rawPath;
            }
         }

         pp[i] = new File(path);
         if (!pp[i].isAbsolute()) { // my suspicion is that this categorically fails for GlusterFS paths
            throw new IllegalArgumentException("Relative paths are not supported");
         }
      }
   }
}


On 03.07.2018 16:35, Jash, Shaswata (Nokia - IN/Bangalore) wrote:

Hello Chesnay,

 

Cluster (in kubernetes)-wide checkpointing directory using glusterfs volume mount (thus file access protocol file:///) was working fine till 1.4.2 for us. So we like to understand where the breakage happened in 1.5.0.

Can you please mention me the relevant source code files related to rocksdb “custom file path” parsing logic? We would be interested to investigate this.

 

I also observed below in the log –

 

Config uses deprecated configuration key 'state.backend.rocksdb.checkpointdir' instead of proper key 'state.backend.rocksdb.localdir'

Regards,

Shaswata

 

From: Chesnay Schepler [[hidden email]]
Sent: Tuesday, July 03, 2018 5:52 PM
To: Data Engineer [hidden email]
Cc: [hidden email]
Subject: Re: Checkpointing in Flink 1.5.0

 

The code appears to be working fine.

This may happen because you're using a GlusterFS volume.
The RocksDBStateBackend uses java Files internally (NOT nio Paths), which AFAIK only work properly against the plain local file-system.

The GlusterFS nio FIleSystem implementation also explicitly does not support conversions to File.

On 03.07.2018 13:53, Chesnay Schepler wrote:

Thanks. Looks like RocksDBStateBackend.setDbStoragePaths has some custom file path parsing logic, will probe it a bit to see what the issue is.

On 03.07.2018 13:45, Data Engineer wrote:

2018-07-03 11:30:35,703 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting StandaloneSessionClusterEntrypoint (Version: <unknown>, Rev:c61b108, Date:24.05.2018 @ 16:54:44 CEST)
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: flink
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b10
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 981 MiBytes
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: /etc/alternatives/jre_openjdk/
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  No Hadoop Dependency available
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xms1024m
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xmx1024m
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog.file=/opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:/opt/flink-1.5.0/conf/log4j.properties
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:/opt/flink-1.5.0/conf/logback.xml
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Program Arguments:
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --configDir
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     /opt/flink-1.5.0/conf
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --executionMode
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --host
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Classpath: /opt/flink-1.5.0/lib/flink-cep_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-connectors-1.5.0.jar:/opt/flink-1.5.0/lib/flink-gelly_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-ml_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-table_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/log4j-1.2.17.jar:/opt/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,712 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 4124
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, myfl-flink-jobmanager
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 4123
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.rpc.port, 4122
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.data.port, 4121
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.query.port, 4125
2018-07-03 11:30:35,722 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.address, myfl-flink-jobmanager-ui
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, rocksdb
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, file:///opt/flink/share/myfl-flink/checkpoints/ext_checkpoints
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.fs.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/fs_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/rocksdb_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy, fixed-delay
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.attempts, 100
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.delay, 1 s
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Starting StandaloneSessionClusterEntrypoint.
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install default filesystem.
2018-07-03 11:30:35,892 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2018-07-03 11:30:35,963 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install security context.
2018-07-03 11:30:35,970 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,988 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,989 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Initializing cluster services.
2018-07-03 11:30:36,003 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Trying to start actor system at myfl-flink-jobmanager:4123
2018-07-03 11:30:37,288 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2018-07-03 11:30:37,396 INFO  akka.remote.Remoting                                          - Starting remoting
2018-07-03 11:30:37,583 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink@myfl-flink-jobmanager:4123]
2018-07-03 11:30:37,591 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at akka.tcp://flink@myfl-flink-jobmanager:4123
2018-07-03 11:30:37,611 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-e445bc66-cee3-4a3d-b810-74df02627eca
2018-07-03 11:30:37,613 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:4124 - max concurrent requests: 50 - max backlog: 1000
2018-07-03 11:30:37,629 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
2018-07-03 11:30:37,664 INFO  org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore  - Initializing FileArchivedExecutionGraphStore: Storage directory /tmp/executionGraphStore-4ff546b1-4bfb-4911-9314-89c61d7e7149, expiration time 3600000, maximum cache size 52428800 bytes.
2018-07-03 11:30:37,694 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /tmp/blobStore-7e0efdb8-f70b-42ed-9387-c0e1b8090b36
2018-07-03 11:30:37,702 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Upload directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available.
2018-07-03 11:30:37,703 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Created directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload for file uploads.
2018-07-03 11:30:37,706 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Starting rest endpoint.
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component log file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component stdout file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.out
2018-07-03 11:30:38,567 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at myfl-flink-jobmanager-ui:8081
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://myfl-flink-jobmanager-ui:8081 was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://myfl-flink-jobmanager-ui:8081.
2018-07-03 11:30:38,578 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2018-07-03 11:30:38,966 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
2018-07-03 11:30:39,068 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - ResourceManager akka.tcp://flink@myfl-flink-jobmanager:4123/user/resourcemanager was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,069 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting the SlotManager.
2018-07-03 11:30:39,164 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher akka.tcp://flink@myfl-flink-jobmanager:4123/user/dispatcher was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,165 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Recovering all persisted jobs.
2018-07-03 11:30:39,682 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Replacing old registration of TaskExecutor 068c693b9585900f68c53b00507ee889.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 8a5cee3aa38081030dc8558ac477d3b3 from the SlotManager.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - The target with resource ID 068c693b9585900f68c53b00507ee889 is already been monitored.
2018-07-03 11:30:39,770 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Registering TaskManager 068c693b9585900f68c53b00507ee889 under 03d409e5166fad4f4082b6165eb0de2e at the SlotManager.
2018-07-03 11:34:20,257 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job b684656d9afd75cc384a7bcd071bf55e (CSV Files Read -> CSV to Avro encode -> Kafka publish).
2018-07-03 11:34:20,269 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_0 .
2018-07-03 11:34:20,278 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Initializing job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,285 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=0) for CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,289 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/67ceb2ae-1cb1-44be-a09e-601032e23fb5 .
2018-07-03 11:34:20,481 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
2018-07-03 11:34:20,562 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Running initialization on master for job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,563 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Successfully ran initialization on master in 0 ms.
2018-07-03 11:34:20,580 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Loading state backend via factory org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
2018-07-03 11:34:20,590 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'state.backend.rocksdb.checkpointdir' instead of proper key 'state.backend.rocksdb.localdir'
2018-07-03 11:34:20,592 ERROR org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    - Implementation error: Unhandled exception.
org.apache.flink.util.FlinkException: Failed to submit job b684656d9afd75cc384a7bcd071bf55e.
  at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
  at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
  at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
  at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
  at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
  at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
  at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
  at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
  at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
  at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
  ... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate configured state backend
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308)
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
  at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
  at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
  at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
  at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
  ... 26 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Invalid configuration for RocksDB state backend's local storage directories: Relative paths are not supported
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:273)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.configure(RocksDBStateBackend.java:296)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:47)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
  at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:157)
  at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222)
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
  ... 31 more
Caused by: java.lang.IllegalArgumentException: Relative paths are not supported
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.setDbStoragePaths(RocksDBStateBackend.java:518)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:269)
  ... 37 more

 

 

On Tue, Jul 3, 2018 at 5:11 PM, Chesnay Schepler <[hidden email]> wrote:

Doesn't sound like intended behavior, can you give us the stacktrace?



On 03.07.2018 13:17, Data Engineer wrote:

 The Flink documentation says that we need to specify the filesystem type (file://, hdfs://) when configuring the rocksdb backend dir.
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html#the-rocksdbstatebackend

But when I do this, I get an error on job submission saying that relative paths are not permitted in the rocksdb stand backend.
I am submitting the job via flink cli (bin/flink run).

Also, even though I give a local file system path "file:///home/abc/share", it is a shared GlusterFS volume mount, so it will be accessible by the JobManager and all TaskManagers.

I removed the filesystem type from the rocksdb backend dir configuration, and though the job got submitted, the rocksdb checkpoint directory was not created.
I have enabled checkpointing in my Flink application.

I am using Flink 1.5.0.

Any help or pointers would be appreciated.

 

 

 

 




Reply | Threaded
Open this post in threaded view
|

Re: Checkpointing in Flink 1.5.0

James Isaac
As a workaround, we commented out  state.backend.rocksdb.localdir since it defaults to the taskmanager.tmp.dirs location.

Now, we are having only these state backend configs in our flink-conf.yaml:
state.backend: rocksdb
state.checkpoints.dir: file:///home/demo/checkpoints/ext_checkpoints
state.savepoints.dir: file:///home/demo/checkpoints/checkpoints/savepoints

Checkpointing and savepointing works with the above configs.

However, I wasn't able to find the rocksdb directory which was supposed to be insid /tmp directory. I was able to find these directories inside /tmp in taskmanager :

drwxr-xr-x.  2 flink flink 4096 Jul 11 05:23 blobStore-122d93f5-35c9-4d8a-9632-e0e65f766825
drwxr-xr-x. 14 flink flink 4096 Jul 11 07:23 blobStore-c7d3433b-8e6d-4195-a431-d9392b638b5f
-rw-r--r--.  1 flink flink    4 Jul 11 05:23 flink--taskexecutor.pid
drwxr-xr-x.  2 flink flink 4096 Jul 11 05:23 flink-dist-cache-08e706f9-a388-4a6d-8774-849207746783
drwxr-xr-x.  2 flink flink 4096 Jul 11 05:23 flink-io-287519fb-4cc8-4396-9072-584e3fae0dcc
drwxr-xr-x.  2 flink flink 4096 Jul 11 05:23 hsperfdata_flink
drwxr-xr-x.  2 root  root  4096 May 16 12:54 hsperfdata_root
-rw-r--r--.  1 flink flink 1179 Jul 11 05:23 jaas-4321856842187934442.conf
drwxr-xr-x.  2 flink flink 4096 Jul 11 07:17 localState

No sign of any rocksdb directory. Or is it not being used at all?



On Tue, Jul 10, 2018 at 12:45 PM, Sampath Bhat <[hidden email]> wrote:
Chesnay - Why is the absolute file check required in the RocksDBStateBackend.setDbStoragePaths(String ... paths). I think this is causing the issue. Its not related to GlusterFS or file system. The same problem can be reproduced with the following configuration on local machine. The flink application should support checkpointing. We get the IllegealArgumentexecption (Relative File paths not allowed)

state.backend: rocksdb
state.checkpoints.dir: file:///home/demo/checkpoints/ext_checkpoints
state.savepoints.dir: file:///home/demo/checkpoints/checkpoints/savepoints
state.backend.fs.checkpointdir: file:///home/demo/checkpoints/checkpoints/fs_state
#state.backend.rocksdb.checkpointdir: file:///home/demo/checkpoints/checkpoints/rocksdb_state
state.backend.rocksdb.localdir: /home/demo/checkpoints/checkpoints/rocksdb_state

Any insights would be helpful.

On Wed, Jul 4, 2018 at 2:27 PM, Chesnay Schepler <[hidden email]> wrote:
Reference: https://issues.apache.org/jira/browse/FLINK-9739


On 04.07.2018 10:46, Chesnay Schepler wrote:
It's not really path-parsing logic, but path handling i suppose; see RocksDBStateBackend#setDbStoragePaths().

I went ahead and converted said method into a simple test method, maybe this is enough to debug the issue.

I assume this regression was caused by FLINK-6557, which refactored the state backend to rely on java Files instead of Flink paths.
I'll open a JIRA to document it.

The deprecation notice is not a problem.

public static void testPaths(String... paths) {
   if (paths.length == 0) {
      throw new IllegalArgumentException("empty paths");
   }
   else {
      File[] pp = new File[paths.length];

      for (int i = 0; i < paths.length; i++) {
         final String rawPath = paths[i];
         final String path;

         if (rawPath == null) {
            throw new IllegalArgumentException("null path");
         }
         else {
            // we need this for backwards compatibility, to allow URIs like 'file:///'...
            URI uri = null;
            try {
               uri = new Path(rawPath).toUri();
            }
            catch (Exception e) {
               // cannot parse as a path
            }

            if (uri != null && uri.getScheme() != null) {
               if ("file".equalsIgnoreCase(uri.getScheme())) {
                  path = uri.getPath();
               }
               else {
                  throw new IllegalArgumentException("Path " + rawPath + " has a non-local scheme");
               }
            }
            else {
               path = rawPath;
            }
         }

         pp[i] = new File(path);
         if (!pp[i].isAbsolute()) { // my suspicion is that this categorically fails for GlusterFS paths
            throw new IllegalArgumentException("Relative paths are not supported");
         }
      }
   }
}


On 03.07.2018 16:35, Jash, Shaswata (Nokia - IN/Bangalore) wrote:

Hello Chesnay,

 

Cluster (in kubernetes)-wide checkpointing directory using glusterfs volume mount (thus file access protocol file:///) was working fine till 1.4.2 for us. So we like to understand where the breakage happened in 1.5.0.

Can you please mention me the relevant source code files related to rocksdb “custom file path” parsing logic? We would be interested to investigate this.

 

I also observed below in the log –

 

Config uses deprecated configuration key 'state.backend.rocksdb.checkpointdir' instead of proper key 'state.backend.rocksdb.localdir'

Regards,

Shaswata

 

From: Chesnay Schepler [[hidden email]]
Sent: Tuesday, July 03, 2018 5:52 PM
To: Data Engineer [hidden email]
Cc: [hidden email]
Subject: Re: Checkpointing in Flink 1.5.0

 

The code appears to be working fine.

This may happen because you're using a GlusterFS volume.
The RocksDBStateBackend uses java Files internally (NOT nio Paths), which AFAIK only work properly against the plain local file-system.

The GlusterFS nio FIleSystem implementation also explicitly does not support conversions to File.

On 03.07.2018 13:53, Chesnay Schepler wrote:

Thanks. Looks like RocksDBStateBackend.setDbStoragePaths has some custom file path parsing logic, will probe it a bit to see what the issue is.

On 03.07.2018 13:45, Data Engineer wrote:

2018-07-03 11:30:35,703 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Starting StandaloneSessionClusterEntrypoint (Version: <unknown>, Rev:c61b108, Date:24.05.2018 @ 16:54:44 CEST)
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  OS current user: flink
2018-07-03 11:30:35,705 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Current Hadoop/Kerberos user: <no hadoop dependency found>
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b10
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Maximum heap size: 981 MiBytes
2018-07-03 11:30:35,706 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JAVA_HOME: /etc/alternatives/jre_openjdk/
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  No Hadoop Dependency available
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  JVM Options:
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xms1024m
2018-07-03 11:30:35,707 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Xmx1024m
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog.file=/opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlog4j.configuration=file:/opt/flink-1.5.0/conf/log4j.properties
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     -Dlogback.configurationFile=file:/opt/flink-1.5.0/conf/logback.xml
2018-07-03 11:30:35,708 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Program Arguments:
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --configDir
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     /opt/flink-1.5.0/conf
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --executionMode
2018-07-03 11:30:35,709 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     --host
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -     cluster
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         -  Classpath: /opt/flink-1.5.0/lib/flink-cep_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-connectors-1.5.0.jar:/opt/flink-1.5.0/lib/flink-gelly_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-ml_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-python_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/flink-table_2.11-1.5.0.jar:/opt/flink-1.5.0/lib/log4j-1.2.17.jar:/opt/flink-1.5.0/lib/slf4j-log4j12-1.7.7.jar:/opt/flink-1.5.0/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-03 11:30:35,710 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - --------------------------------------------------------------------------------
2018-07-03 11:30:35,712 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 4124
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, myfl-flink-jobmanager
2018-07-03 11:30:35,720 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 4123
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.rpc.port, 4122
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.data.port, 4121
2018-07-03 11:30:35,721 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.query.port, 4125
2018-07-03 11:30:35,722 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.address, myfl-flink-jobmanager-ui
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, rocksdb
2018-07-03 11:30:35,762 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, file:///opt/flink/share/myfl-flink/checkpoints/ext_checkpoints
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.fs.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/fs_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend.rocksdb.checkpointdir, file:///opt/flink/share/myfl-flink/checkpoints/rocksdb_state
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 4
2018-07-03 11:30:35,763 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy, fixed-delay
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.attempts, 100
2018-07-03 11:30:35,764 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: restart-strategy.fixed-delay.delay, 1 s
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Starting StandaloneSessionClusterEntrypoint.
2018-07-03 11:30:35,885 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install default filesystem.
2018-07-03 11:30:35,892 INFO  org.apache.flink.core.fs.FileSystem                           - Hadoop is not in the classpath/dependencies. The extended set of supported File Systems via Hadoop is not available.
2018-07-03 11:30:35,963 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Install security context.
2018-07-03 11:30:35,970 INFO  org.apache.flink.runtime.security.modules.HadoopModuleFactory  - Cannot create Hadoop Security Module because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,988 INFO  org.apache.flink.runtime.security.SecurityUtils               - Cannot install HadoopSecurityContext because Hadoop cannot be found in the Classpath.
2018-07-03 11:30:35,989 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Initializing cluster services.
2018-07-03 11:30:36,003 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Trying to start actor system at myfl-flink-jobmanager:4123
2018-07-03 11:30:37,288 INFO  akka.event.slf4j.Slf4jLogger                                  - Slf4jLogger started
2018-07-03 11:30:37,396 INFO  akka.remote.Remoting                                          - Starting remoting
2018-07-03 11:30:37,583 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://flink@myfl-flink-jobmanager:4123]
2018-07-03 11:30:37,591 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Actor system started at akka.tcp://flink@myfl-flink-jobmanager:4123
2018-07-03 11:30:37,611 INFO  org.apache.flink.runtime.blob.BlobServer                      - Created BLOB server storage directory /tmp/blobStore-e445bc66-cee3-4a3d-b810-74df02627eca
2018-07-03 11:30:37,613 INFO  org.apache.flink.runtime.blob.BlobServer                      - Started BLOB server at 0.0.0.0:4124 - max concurrent requests: 50 - max backlog: 1000
2018-07-03 11:30:37,629 INFO  org.apache.flink.runtime.metrics.MetricRegistryImpl           - No metrics reporter configured, no metrics will be exposed/reported.
2018-07-03 11:30:37,664 INFO  org.apache.flink.runtime.dispatcher.FileArchivedExecutionGraphStore  - Initializing FileArchivedExecutionGraphStore: Storage directory /tmp/executionGraphStore-4ff546b1-4bfb-4911-9314-89c61d7e7149, expiration time 3600000, maximum cache size 52428800 bytes.
2018-07-03 11:30:37,694 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Created BLOB cache storage directory /tmp/blobStore-7e0efdb8-f70b-42ed-9387-c0e1b8090b36
2018-07-03 11:30:37,702 WARN  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Upload directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload does not exist, or has been deleted externally. Previously uploaded files are no longer available.
2018-07-03 11:30:37,703 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Created directory /tmp/flink-web-e68a12b9-b9cc-4508-be00-4bf9f113afcd/flink-web-upload for file uploads.
2018-07-03 11:30:37,706 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Starting rest endpoint.
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component log file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.log
2018-07-03 11:30:38,369 INFO  org.apache.flink.runtime.webmonitor.WebMonitorUtils           - Determined location of main cluster component stdout file: /opt/flink-1.5.0/log/flink--standalonesession-0-myfl-flink-jobmanager-7b4d8c4dd4-bv6zf.out
2018-07-03 11:30:38,567 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Rest endpoint listening at myfl-flink-jobmanager-ui:8081
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - http://myfl-flink-jobmanager-ui:8081 was granted leadership with leaderSessionID=00000000-0000-0000-0000-000000000000
2018-07-03 11:30:38,568 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Web frontend listening at http://myfl-flink-jobmanager-ui:8081.
2018-07-03 11:30:38,578 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.resourcemanager.StandaloneResourceManager at akka://flink/user/resourcemanager .
2018-07-03 11:30:38,966 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.dispatcher.StandaloneDispatcher at akka://flink/user/dispatcher .
2018-07-03 11:30:39,068 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - ResourceManager akka.tcp://flink@myfl-flink-jobmanager:4123/user/resourcemanager was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,069 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Starting the SlotManager.
2018-07-03 11:30:39,164 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Dispatcher akka.tcp://flink@myfl-flink-jobmanager:4123/user/dispatcher was granted leadership with fencing token 00000000000000000000000000000000
2018-07-03 11:30:39,165 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Recovering all persisted jobs.
2018-07-03 11:30:39,682 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - Replacing old registration of TaskExecutor 068c693b9585900f68c53b00507ee889.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Unregister TaskManager 8a5cee3aa38081030dc8558ac477d3b3 from the SlotManager.
2018-07-03 11:30:39,683 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  - The target with resource ID 068c693b9585900f68c53b00507ee889 is already been monitored.
2018-07-03 11:30:39,770 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Registering TaskManager 068c693b9585900f68c53b00507ee889 under 03d409e5166fad4f4082b6165eb0de2e at the SlotManager.
2018-07-03 11:34:20,257 INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting job b684656d9afd75cc384a7bcd071bf55e (CSV Files Read -> CSV to Avro encode -> Kafka publish).
2018-07-03 11:34:20,269 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.JobMaster at akka://flink/user/jobmanager_0 .
2018-07-03 11:34:20,278 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Initializing job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,285 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647, delayBetweenRestartAttempts=0) for CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,289 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Starting RPC endpoint for org.apache.flink.runtime.jobmaster.slotpool.SlotPool at akka://flink/user/67ceb2ae-1cb1-44be-a09e-601032e23fb5 .
2018-07-03 11:34:20,481 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job recovers via failover strategy: full graph restart
2018-07-03 11:34:20,562 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Running initialization on master for job CSV Files Read -> CSV to Avro encode -> Kafka publish (b684656d9afd75cc384a7bcd071bf55e).
2018-07-03 11:34:20,563 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Successfully ran initialization on master in 0 ms.
2018-07-03 11:34:20,580 INFO  org.apache.flink.runtime.jobmaster.JobMaster                  - Loading state backend via factory org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
2018-07-03 11:34:20,590 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'state.backend.rocksdb.checkpointdir' instead of proper key 'state.backend.rocksdb.localdir'
2018-07-03 11:34:20,592 ERROR org.apache.flink.runtime.rest.handler.job.JobSubmitHandler    - Implementation error: Unhandled exception.
org.apache.flink.util.FlinkException: Failed to submit job b684656d9afd75cc384a7bcd071bf55e.
  at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
  at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
  at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
  at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
  at akka.actor.ActorCell.invoke(ActorCell.scala:495)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
  at akka.dispatch.Mailbox.run(Mailbox.scala:224)
  at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
  at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
  at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
  at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
  at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
  at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
  at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
  ... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not instantiate configured state backend
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:308)
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
  at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
  at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
  at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
  at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
  ... 26 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Invalid configuration for RocksDB state backend's local storage directories: Relative paths are not supported
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:273)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.configure(RocksDBStateBackend.java:296)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:47)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory.createFromConfig(RocksDBStateBackendFactory.java:32)
  at org.apache.flink.runtime.state.StateBackendLoader.loadStateBackendFromConfig(StateBackendLoader.java:157)
  at org.apache.flink.runtime.state.StateBackendLoader.fromApplicationOrConfigOrDefault(StateBackendLoader.java:222)
  at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:304)
  ... 31 more
Caused by: java.lang.IllegalArgumentException: Relative paths are not supported
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.setDbStoragePaths(RocksDBStateBackend.java:518)
  at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.<init>(RocksDBStateBackend.java:269)
  ... 37 more

 

 

On Tue, Jul 3, 2018 at 5:11 PM, Chesnay Schepler <[hidden email]> wrote:

Doesn't sound like intended behavior, can you give us the stacktrace?



On 03.07.2018 13:17, Data Engineer wrote:

 The Flink documentation says that we need to specify the filesystem type (file://, hdfs://) when configuring the rocksdb backend dir.
https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/state/state_backends.html#the-rocksdbstatebackend

But when I do this, I get an error on job submission saying that relative paths are not permitted in the rocksdb stand backend.
I am submitting the job via flink cli (bin/flink run).

Also, even though I give a local file system path "file:///home/abc/share", it is a shared GlusterFS volume mount, so it will be accessible by the JobManager and all TaskManagers.

I removed the filesystem type from the rocksdb backend dir configuration, and though the job got submitted, the rocksdb checkpoint directory was not created.
I have enabled checkpointing in my Flink application.

I am using Flink 1.5.0.

Any help or pointers would be appreciated.