Hi ,
I am pretty new. I am keep on struggling to read a file from s3 but getting this weird exception : Caused by: java.lang.NumberFormatException: For input string: "64M" (if anyone can link me to a working github example that will be awesome) . what am i doing wrong? This is how my code looks like this : import org.apache.flink.api.scala.createTypeInformationsbt dependencies :
|
Can you show us the full exception
stacktrace? Intuitively I would think your cluster configuration
contains an invalid value for some memory configuration option.
On 3/4/2021 4:45 PM, Avi Levi wrote:
|
Sure, This is the full exception stacktrace: Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117) at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:238) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:531) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447) at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: java.lang.NumberFormatException: For input string: "64M" at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:68) at java.base/java.lang.Long.parseLong(Long.java:707) at java.base/java.lang.Long.parseLong(Long.java:832) at org.apache.hadoop.conf.Configuration.getLong(Configuration.java:1563) at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:248) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3414) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:158) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3474) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3442) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) at org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38) at com.neura.ParquetSourceFunction.run(Job.scala:45) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241) On Thu, Mar 4, 2021 at 6:02 PM Chesnay Schepler <[hidden email]> wrote:
|
Does anyone by any chance have a working example (of course without the credentials etc') that can be shared on github ?simply reading/writing a file from/to s3. I keep on struggling with this one and getting weird exceptions Thanks On Thu, Mar 4, 2021 at 7:30 PM Avi Levi <[hidden email]> wrote:
|
Hey Avi,
Do you use 'Hadoop S3 plugin' to read from S3?
If yes, what is its version?
If not try to read from S3 as follow (ref)
Tamir
From: Avi Levi <[hidden email]>
Sent: Saturday, March 6, 2021 6:59 AM To: Chesnay Schepler <[hidden email]> Cc: [hidden email] <[hidden email]> Subject: Re: reading file from s3
Does anyone by any chance have a working example (of course without the credentials etc') that can be shared on github ?simply reading/writing a file from/to s3.
I keep on struggling with this one and getting weird exceptions
Thanks
On Thu, Mar 4, 2021 at 7:30 PM Avi Levi <[hidden email]> wrote:
|
I had a typo in my previous answer, the env name was missing an 'S'
ENABLE_BUILT_IN_PLUGIN --> ENABLE_BUILT_IN_PLUGINS
once again, the value is the plugin jar name : flink-s3-fs-hadoop-<flink-version>.jar
The complete list can be found here
You can Build your own Flink image and set an Environment variable in it or once you run the container.
If you execute it locally(not in a container) in a standalone cluster, make sure this env is defined in system level.
Tamir.
From: Tamir Sagi <[hidden email]>
Sent: Saturday, March 6, 2021 7:33 PM To: Avi Levi <[hidden email]>; Chesnay Schepler <[hidden email]> Cc: [hidden email] <[hidden email]> Subject: [SUSPECTED FRAUD]Re: reading file from s3
Hey Avi,
Do you use 'Hadoop S3 plugin' to read from S3?
If yes, what is its version?
If not try to read from S3 as follow (ref)
Tamir
From: Avi Levi <[hidden email]>
Sent: Saturday, March 6, 2021 6:59 AM To: Chesnay Schepler <[hidden email]> Cc: [hidden email] <[hidden email]> Subject: Re: reading file from s3
Does anyone by any chance have a working example (of course without the credentials etc') that can be shared on github ?simply reading/writing a file from/to s3.
I keep on struggling with this one and getting weird exceptions
Thanks
On Thu, Mar 4, 2021 at 7:30 PM Avi Levi <[hidden email]> wrote:
|
Thanks Tamir, I was having some issues connecting from my IDE (solved) but this is really helpful. On Sat, Mar 6, 2021, 23:04 Tamir Sagi <[hidden email]> wrote:
|
Free forum by Nabble | Edit this page |