Hi,
I am using flink version 1.7.2 , I am trying to use S3 like object storage EMC ECS( https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm) . I am using the flink-s3-fs-hadoop-1.7.2.jar file as a dependency for s3 filesystem and I have placed it under the lib folder and is available to flink in its class path. My flink-conf.yaml looks like this : s3.endpoint: SU73ECSG1P1d.***.COM:9021 s3.access-key: vdna_np_user security.ssl.rest.enabled: false web.timeout: 10000 s3.secret-key: J*** And my code for statebackend is like this : env.setStateBackend(new FsStateBackend("s3://aip_featuretoolkit/checkpoints/")) I have a bucket called aip_featuretoolkit in my s3 instance. I can connect to s3 form s3 command line utilities. However I cannot checkpoint with this configuration in flink. I get the following error message Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Could not retrieve JobResult. at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) at com.visa.flink.job.DruidStreamJob$.runJob(DruidStreamJob.scala:62) at com.visa.flink.cli.CliFlinkDruid.run(CliFlinkDruid.scala:19) at com.visa.flink.cli.Main$.main(Main.scala:22) at com.visa.flink.cli.Main.main(Main.scala) Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job. at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267) at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739) at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) at akka.actor.Actor$class.aroundReceive(Actor.scala:502) at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) at akka.actor.ActorCell.invoke(ActorCell.scala:495) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) at akka.dispatch.Mailbox.run(Mailbox.scala:224) at akka.dispatch.Mailbox.exec(Mailbox.scala:234) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) ... 4 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176) at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058) at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308) at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) ... 7 more Caused by: java.lang.RuntimeException: Failed to start checkpoint ID counter: null uri host. This can be caused by unencoded / in the password string at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:255) at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345) at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1166) at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1146) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:296) at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157) ... 10 more Caused by: java.io.IOException: null uri host. This can be caused by unencoded / in the password string at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:159) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58) at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444) at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249) ... 17 more Caused by: java.lang.NullPointerException: null uri host. This can be caused by unencoded / in the password string at java.util.Objects.requireNonNull(Objects.java:228) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:69) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:467) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:234) at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:124) ... 23 more It seems like the way I setup the state backed causes this exception ie. env.setStateBackend(new FsStateBackend("s3://aip_featuretoolkit/checkpoints/")) How can I resolve this issue, are S3 like object stores supported by 1.7.2 ? Thanks, Vishwas |
Hi Vishwas,
It might be that you’ve got a legacy bucket name (“aip_featuretoolkit”), as AWS no longer allows bucket names to contain an underscore. I’m guessing that the Hadoop S3 code is trying to treat your path as a valid URI, but the bucket name doesn’t conform, and thus you get the "null uri host” issue. Could you try with a compliant bucket name? — Ken
-------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra |
Hi Ken,
Thanks for reaching out, I created a compliant bucket with name aip-featuretoolkit. I now get the exception "Unable to execute HTTP request: aip-featuretoolkit.SU73ECSG1P1d.***.COM: Name or service not known" from org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.class line 56. Here is my config from flink-conf.yaml file. s3.endpoint: SU73ECSG1P1d.***.COM s3.access-key: vdna_np_user security.ssl.rest.enabled: false web.timeout: 10000 s3.secret-key: J*** I have not supplied the port in the config file. Does it internally use 9021 ? Also I am running my application as a different user not what is specified in s3.access-key. Does that matter ? Thanks, Vishwas On Thu, Jun 20, 2019 at 5:06 PM Ken Krugler <[hidden email]> wrote: > > Hi Vishwas, > > It might be that you’ve got a legacy bucket name (“aip_featuretoolkit”), as AWS no longer allows bucket names to contain an underscore. > > I’m guessing that the Hadoop S3 code is trying to treat your path as a valid URI, but the bucket name doesn’t conform, and thus you get the "null uri host” issue. > > Could you try with a compliant bucket name? > > — Ken > > On Jun 20, 2019, at 2:46 PM, Vishwas Siravara <[hidden email]> wrote: > > Hi, > I am using flink version 1.7.2 , I am trying to use S3 like object > storage EMC ECS( > https://www.emc.com/techpubs/ecs/ecs_s3_supported_features-1.htm) . > > I am using the flink-s3-fs-hadoop-1.7.2.jar file as a dependency for > s3 filesystem and I have placed it under the lib folder and is > available to flink in its class path. > > My flink-conf.yaml looks like this : > > s3.endpoint: SU73ECSG1P1d.***.COM:9021 > s3.access-key: vdna_np_user > security.ssl.rest.enabled: false > web.timeout: 10000 > s3.secret-key: J*** > > And my code for statebackend is like this : > > env.setStateBackend(new FsStateBackend("s3://aip_featuretoolkit/checkpoints/")) > > I have a bucket called aip_featuretoolkit in my s3 instance. I can > connect to s3 form s3 command line utilities. However I cannot > checkpoint with this configuration in flink. I get the following error > message > > Exception in thread "main" > org.apache.flink.runtime.client.JobExecutionException: Could not > retrieve JobResult. > at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:643) > at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) > at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) > at com.visa.flink.job.DruidStreamJob$.runJob(DruidStreamJob.scala:62) > at com.visa.flink.cli.CliFlinkDruid.run(CliFlinkDruid.scala:19) > at com.visa.flink.cli.Main$.main(Main.scala:22) > at com.visa.flink.cli.Main.main(Main.scala) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: > Failed to submit job. > at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267) > at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739) > at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158) > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70) > at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142) > at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40) > at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165) > at akka.actor.Actor$class.aroundReceive(Actor.scala:502) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) > at akka.actor.ActorCell.invoke(ActorCell.scala:495) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set > up JobManager > at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > ... 4 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: > Could not set up JobManager > at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176) > at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058) > at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308) > at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > ... 7 more > Caused by: java.lang.RuntimeException: Failed to start checkpoint ID > counter: null uri host. This can be caused by unencoded / in the > password string > at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:255) > at org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:498) > at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:345) > at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100) > at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1166) > at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1146) > at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:296) > at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157) > ... 10 more > Caused by: java.io.IOException: null uri host. This can be caused by > unencoded / in the password string > at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:159) > at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:395) > at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318) > at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298) > at org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.<init>(FsCheckpointStorage.java:58) > at org.apache.flink.runtime.state.filesystem.FsStateBackend.createCheckpointStorage(FsStateBackend.java:444) > at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:249) > ... 17 more > Caused by: java.lang.NullPointerException: null uri host. This can be > caused by unencoded / in the password string > at java.util.Objects.requireNonNull(Objects.java:228) > at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3native.S3xLoginHelper.buildFSURI(S3xLoginHelper.java:69) > at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.setUri(S3AFileSystem.java:467) > at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:234) > at org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:124) > ... 23 more > > > It seems like the way I setup the state backed causes this exception ie. > > env.setStateBackend(new FsStateBackend("s3://aip_featuretoolkit/checkpoints/")) > > How can I resolve this issue, are S3 like object stores supported by 1.7.2 ? > > Thanks, > Vishwas > > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra > |
Free forum by Nabble | Edit this page |