getting an error when configuring state backend to hdfs

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

getting an error when configuring state backend to hdfs

avilevi
Hi,
I am trying to set the backend state to hdfs 
val stateUri = "hdfs/path_to_dir"
val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, true)
env.setStateBackend(backend)

I am running with flink 1.7.0 with the following dependencies (tried them with different combinations)  :
"org.apache.flink"    %% "flink-connector-filesystem"         % flinkV
"org.apache.flink"    % "flink-hadoop-fs"                     % flinkV
"org.apache.hadoop"   % "hadoop-hdfs"                         % hadoopVersion
"org.apache.hadoop"   % "hadoop-common"                       % hadoopVersion

however when running the jar I am getting this error:

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more

any help will be greatly appreciated 

Reply | Threaded
Open this post in threaded view
|

Re: getting an error when configuring state backend to hdfs

Chesnay Schepler
Are you including the filesystems in your jar? Filesystem jars must be placed in the /lib directory of the flink distribution.

On 19.12.2018 15:03, Avi Levi wrote:
Hi,
I am trying to set the backend state to hdfs 
val stateUri = "hdfs/path_to_dir"
val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, true)
env.setStateBackend(backend)

I am running with flink 1.7.0 with the following dependencies (tried them with different combinations)  :
"org.apache.flink"    %% "flink-connector-filesystem"         % flinkV
"org.apache.flink"    % "flink-hadoop-fs"                     % flinkV
"org.apache.hadoop"   % "hadoop-hdfs"                         % hadoopVersion
"org.apache.hadoop"   % "hadoop-common"                       % hadoopVersion

however when running the jar I am getting this error:

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more

any help will be greatly appreciated 


Reply | Threaded
Open this post in threaded view
|

Re: getting an error when configuring state backend to hdfs

avilevi
Hi Chesnay,
What do you mean? I am creating a fat jar with all dependencies (using sbt assembly). which jar I should place in the /lib directory ?

On Wed, Dec 19, 2018 at 4:44 PM Chesnay Schepler <[hidden email]> wrote:
Are you including the filesystems in your jar? Filesystem jars must be placed in the /lib directory of the flink distribution.

On 19.12.2018 15:03, Avi Levi wrote:
Hi,
I am trying to set the backend state to hdfs 
val stateUri = "hdfs/path_to_dir"
val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, true)
env.setStateBackend(backend)

I am running with flink 1.7.0 with the following dependencies (tried them with different combinations)  :
"org.apache.flink"    %% "flink-connector-filesystem"         % flinkV
"org.apache.flink"    % "flink-hadoop-fs"                     % flinkV
"org.apache.hadoop"   % "hadoop-hdfs"                         % hadoopVersion
"org.apache.hadoop"   % "hadoop-common"                       % hadoopVersion

however when running the jar I am getting this error:

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more

any help will be greatly appreciated 


Reply | Threaded
Open this post in threaded view
|

Re: getting an error when configuring state backend to hdfs

Chesnay Schepler
flink-hadoop-fs should be in /lib

On 19.12.2018 16:44, Avi Levi wrote:
Hi Chesnay,
What do you mean? I am creating a fat jar with all dependencies (using sbt assembly). which jar I should place in the /lib directory ?

On Wed, Dec 19, 2018 at 4:44 PM Chesnay Schepler <[hidden email]> wrote:
Are you including the filesystems in your jar? Filesystem jars must be placed in the /lib directory of the flink distribution.

On 19.12.2018 15:03, Avi Levi wrote:
Hi,
I am trying to set the backend state to hdfs 
val stateUri = "hdfs/path_to_dir"
val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, true)
env.setStateBackend(backend)

I am running with flink 1.7.0 with the following dependencies (tried them with different combinations)  :
"org.apache.flink"    %% "flink-connector-filesystem"         % flinkV
"org.apache.flink"    % "flink-hadoop-fs"                     % flinkV
"org.apache.hadoop"   % "hadoop-hdfs"                         % hadoopVersion
"org.apache.hadoop"   % "hadoop-common"                       % hadoopVersion

however when running the jar I am getting this error:

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more

any help will be greatly appreciated 



Reply | Threaded
Open this post in threaded view
|

Re: getting an error when configuring state backend to hdfs

Steven Nelson
In reply to this post by avilevi
What image are you using?

Sent from my iPhone

On Dec 19, 2018, at 9:44 AM, Avi Levi <[hidden email]> wrote:

Hi Chesnay,
What do you mean? I am creating a fat jar with all dependencies (using sbt assembly). which jar I should place in the /lib directory ?

On Wed, Dec 19, 2018 at 4:44 PM Chesnay Schepler <[hidden email]> wrote:
Are you including the filesystems in your jar? Filesystem jars must be placed in the /lib directory of the flink distribution.

On 19.12.2018 15:03, Avi Levi wrote:
Hi,
I am trying to set the backend state to hdfs 
val stateUri = "hdfs/path_to_dir"
val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, true)
env.setStateBackend(backend)

I am running with flink 1.7.0 with the following dependencies (tried them with different combinations)  :
"org.apache.flink"    %% "flink-connector-filesystem"         % flinkV
"org.apache.flink"    % "flink-hadoop-fs"                     % flinkV
"org.apache.hadoop"   % "hadoop-hdfs"                         % hadoopVersion
"org.apache.hadoop"   % "hadoop-common"                       % hadoopVersion

however when running the jar I am getting this error:

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more

any help will be greatly appreciated 


Reply | Threaded
Open this post in threaded view
|

Re: getting an error when configuring state backend to hdfs

avilevi
when I try running from my IDE (intellij) I am getting this exception 
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.bluevoyant.StreamingJob$.delayedEndpoint$com$bluevoyant$StreamingJob$1(StreamingJob.scala:41)
at com.bluevoyant.StreamingJob$delayedInit$body.apply(StreamingJob.scala:15)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.bluevoyant.StreamingJob$.main(StreamingJob.scala:15)
at com.bluevoyant.StreamingJob.main(StreamingJob.scala)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.lang.RuntimeException: Failed to start checkpoint ID counter: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:255)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1166)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1146)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:296)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more


On Wed, Dec 19, 2018 at 6:50 PM Steven Nelson <[hidden email]> wrote:
What image are you using?

Sent from my iPhone

On Dec 19, 2018, at 9:44 AM, Avi Levi <[hidden email]> wrote:

Hi Chesnay,
What do you mean? I am creating a fat jar with all dependencies (using sbt assembly). which jar I should place in the /lib directory ?

On Wed, Dec 19, 2018 at 4:44 PM Chesnay Schepler <[hidden email]> wrote:
Are you including the filesystems in your jar? Filesystem jars must be placed in the /lib directory of the flink distribution.

On 19.12.2018 15:03, Avi Levi wrote:
Hi,
I am trying to set the backend state to hdfs 
val stateUri = "hdfs/path_to_dir"
val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, true)
env.setStateBackend(backend)

I am running with flink 1.7.0 with the following dependencies (tried them with different combinations)  :
"org.apache.flink"    %% "flink-connector-filesystem"         % flinkV
"org.apache.flink"    % "flink-hadoop-fs"                     % flinkV
"org.apache.hadoop"   % "hadoop-hdfs"                         % hadoopVersion
"org.apache.hadoop"   % "hadoop-common"                       % hadoopVersion

however when running the jar I am getting this error:

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more

any help will be greatly appreciated 


Reply | Threaded
Open this post in threaded view
|

Re: getting an error when configuring state backend to hdfs

Yun Tang
Hi Avi

For application running in your IDE, please set the checkpoint path schema as "file://", you could refer to source code of ITcases using rocksDBStateBackend.

For application running in your cluster, please choose Flink with Hadoop to download, or choose Flink without hadoop and export your HADOOP_CLASSPATH [1]



Best
Yun Tang

From: Avi Levi <[hidden email]>
Sent: Thursday, December 20, 2018 2:11
To: Steven Nelson
Cc: Chesnay Schepler; [hidden email]
Subject: Re: getting an error when configuring state backend to hdfs
 
when I try running from my IDE (intellij) I am getting this exception 
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.bluevoyant.StreamingJob$.delayedEndpoint$com$bluevoyant$StreamingJob$1(StreamingJob.scala:41)
at com.bluevoyant.StreamingJob$delayedInit$body.apply(StreamingJob.scala:15)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.bluevoyant.StreamingJob$.main(StreamingJob.scala:15)
at com.bluevoyant.StreamingJob.main(StreamingJob.scala)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.lang.RuntimeException: Failed to start checkpoint ID counter: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:255)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1166)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1146)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:296)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more


On Wed, Dec 19, 2018 at 6:50 PM Steven Nelson <[hidden email]> wrote:
What image are you using?

Sent from my iPhone

On Dec 19, 2018, at 9:44 AM, Avi Levi <[hidden email]> wrote:

Hi Chesnay,
What do you mean? I am creating a fat jar with all dependencies (using sbt assembly). which jar I should place in the /lib directory ?

On Wed, Dec 19, 2018 at 4:44 PM Chesnay Schepler <[hidden email]> wrote:
Are you including the filesystems in your jar? Filesystem jars must be placed in the /lib directory of the flink distribution.

On 19.12.2018 15:03, Avi Levi wrote:
Hi,
I am trying to set the backend state to hdfs 
val stateUri = "hdfs/path_to_dir"
val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, true)
env.setStateBackend(backend)

I am running with flink 1.7.0 with the following dependencies (tried them with different combinations)  :
"org.apache.flink"    %% "flink-connector-filesystem"         % flinkV
"org.apache.flink"    % "flink-hadoop-fs"                     % flinkV
"org.apache.hadoop"   % "hadoop-hdfs"                         % hadoopVersion
"org.apache.hadoop"   % "hadoop-common"                       % hadoopVersion

however when running the jar I am getting this error:

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more

any help will be greatly appreciated 


Reply | Threaded
Open this post in threaded view
|

Re: getting an error when configuring state backend to hdfs

avilevi
Thanks !
My apology for my late response. all good advices  
I did put the flink-hadoop-fs jar in /lib as Chesnay suggested.  and from the IDE simply use file:// as Yun suggested 

On Mon, Dec 24, 2018 at 6:32 AM Yun Tang <[hidden email]> wrote:
Hi Avi

For application running in your IDE, please set the checkpoint path schema as "file://", you could refer to source code of ITcases using rocksDBStateBackend.

For application running in your cluster, please choose Flink with Hadoop to download, or choose Flink without hadoop and export your HADOOP_CLASSPATH [1]



Best
Yun Tang

From: Avi Levi <[hidden email]>
Sent: Thursday, December 20, 2018 2:11
To: Steven Nelson
Cc: Chesnay Schepler; [hidden email]
Subject: Re: getting an error when configuring state backend to hdfs
 
when I try running from my IDE (intellij) I am getting this exception 
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult.
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.bluevoyant.StreamingJob$.delayedEndpoint$com$bluevoyant$StreamingJob$1(StreamingJob.scala:41)
at com.bluevoyant.StreamingJob$delayedInit$body.apply(StreamingJob.scala:15)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.bluevoyant.StreamingJob$.main(StreamingJob.scala:15)
at com.bluevoyant.StreamingJob.main(StreamingJob.scala)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: java.lang.RuntimeException: Failed to start checkpoint ID counter: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:255)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1166)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1146)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:296)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more


On Wed, Dec 19, 2018 at 6:50 PM Steven Nelson <[hidden email]> wrote:
What image are you using?

Sent from my iPhone

On Dec 19, 2018, at 9:44 AM, Avi Levi <[hidden email]> wrote:

Hi Chesnay,
What do you mean? I am creating a fat jar with all dependencies (using sbt assembly). which jar I should place in the /lib directory ?

On Wed, Dec 19, 2018 at 4:44 PM Chesnay Schepler <[hidden email]> wrote:
Are you including the filesystems in your jar? Filesystem jars must be placed in the /lib directory of the flink distribution.

On 19.12.2018 15:03, Avi Levi wrote:
Hi,
I am trying to set the backend state to hdfs 
val stateUri = "hdfs/path_to_dir"
val backend: RocksDBStateBackend = new RocksDBStateBackend(stateUri, true)
env.setStateBackend(backend)

I am running with flink 1.7.0 with the following dependencies (tried them with different combinations)  :
"org.apache.flink"    %% "flink-connector-filesystem"         % flinkV
"org.apache.flink"    % "flink-hadoop-fs"                     % flinkV
"org.apache.hadoop"   % "hadoop-hdfs"                         % hadoopVersion
"org.apache.hadoop"   % "hadoop-common"                       % hadoopVersion

however when running the jar I am getting this error:

Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:403)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58)
at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444)
at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createCheckpointStorage(RocksDBStateBackend.java:407)
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249)
... 17 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:64)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
... 23 more

any help will be greatly appreciated