Flink 1.5 batch job fails to start

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink 1.5 batch job fails to start

Alex Vinnik
Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5 and getting a weird exception.

Job reads json from s3a and writes parquet files to s3a with avro model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to S3AFileSystem class.

Fails here
with
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data

To be exact it fails right on that line.

Not sure how to resolve this problem. Looking for an advice. Let me know if more info is needed. Full stack is below. Thanks.

org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 29 more
Caused by: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.5 batch job fails to start

Till Rohrmann
Hi Alex,

I'm not entirely sure what causes this problem because it is the first time I see it. 

First question would be if the problem also arises if using a different Hadoop version.

Are you using the same Java versions on the client as well as on the server?

Could you provide us with the cluster entrypoint logs?

Cheers,
Till

On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <[hidden email]> wrote:
Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5 and getting a weird exception.

Job reads json from s3a and writes parquet files to s3a with avro model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to S3AFileSystem class.

Fails here
with
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data

To be exact it fails right on that line.

Not sure how to resolve this problem. Looking for an advice. Let me know if more info is needed. Full stack is below. Thanks.

org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 29 more
Caused by: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.5 batch job fails to start

Alex Vinnik
Hi Till,

Thanks for responding. Below is entrypoint logs. One thing I noticed that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could it be a reason for that error? If so how can i use same hadoop version 2.8 on flink server side?  BTW job runs fine locally reading from the same s3a buckets when executed using createLocalEnvironment via java -jar my-fat.jar --input s3a://foo --output s3a://bar

Regarding java version. The job is submitted via Flink UI, so it should not be a problem. 

Thanks a lot in advance.

2018-07-24T12:09:38.083+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current Hadoop/Kerberos user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap size: 1963 MiBytes
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME: /docker-java-home/jre
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.7.3
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dcom.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program Arguments:
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --configDir
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      /opt/flink/conf
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --executionMode
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath: /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting StandaloneSessionClusterEntrypoint.
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install default filesystem.
2018-07-24T12:09:38.927+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install security context.
2018-07-24T12:09:39.034+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing cluster services.
2018-07-24T12:09:39.059+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to start actor system at flink-jobmanager:6123
2018-07-24T12:09:40.335+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system started at akka.tcp://flink@flink-jobmanager:6123

On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

I'm not entirely sure what causes this problem because it is the first time I see it. 

First question would be if the problem also arises if using a different Hadoop version.

Are you using the same Java versions on the client as well as on the server?

Could you provide us with the cluster entrypoint logs?

Cheers,
Till

On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <[hidden email]> wrote:
Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5 and getting a weird exception.

Job reads json from s3a and writes parquet files to s3a with avro model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to S3AFileSystem class.

Fails here
with
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data

To be exact it fails right on that line.

Not sure how to resolve this problem. Looking for an advice. Let me know if more info is needed. Full stack is below. Thanks.

org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 29 more
Caused by: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more

Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.5 batch job fails to start

vino yang
Hi Alex,

Based on your log information, the potential reason is Hadoop version. To troubleshoot the exception comes from different Hadoop version. I suggest you match the both side of Hadoop version.

You can : 

1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's official website provides the binary binding Hadoop 2.8.[1]
2. downgrade your fat jar's Hadoop client dependency's version to match Flink Cluster's hadoop dependency's version.


Thanks, vino.

2018-07-24 22:59 GMT+08:00 Alex Vinnik <[hidden email]>:
Hi Till,

Thanks for responding. Below is entrypoint logs. One thing I noticed that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could it be a reason for that error? If so how can i use same hadoop version 2.8 on flink server side?  BTW job runs fine locally reading from the same s3a buckets when executed using createLocalEnvironment via java -jar my-fat.jar --input s3a://foo --output s3a://bar

Regarding java version. The job is submitted via Flink UI, so it should not be a problem. 

Thanks a lot in advance.

2018-07-24T12:09:38.083+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current Hadoop/Kerberos user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap size: 1963 MiBytes
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME: /docker-java-home/jre
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.7.3
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dcom.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program Arguments:
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --configDir
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      /opt/flink/conf
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --executionMode
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath: /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting StandaloneSessionClusterEntrypoint.
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install default filesystem.
2018-07-24T12:09:38.927+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install security context.
2018-07-24T12:09:39.034+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing cluster services.
2018-07-24T12:09:39.059+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to start actor system at flink-jobmanager:6123
2018-07-24T12:09:40.335+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system started at akka.tcp://flink@flink-jobmanager:6123

On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

I'm not entirely sure what causes this problem because it is the first time I see it. 

First question would be if the problem also arises if using a different Hadoop version.

Are you using the same Java versions on the client as well as on the server?

Could you provide us with the cluster entrypoint logs?

Cheers,
Till

On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <[hidden email]> wrote:
Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5 and getting a weird exception.

Job reads json from s3a and writes parquet files to s3a with avro model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to S3AFileSystem class.

Fails here
with
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data

To be exact it fails right on that line.

Not sure how to resolve this problem. Looking for an advice. Let me know if more info is needed. Full stack is below. Thanks.

org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 29 more
Caused by: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more


Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.5 batch job fails to start

Alex Vinnik
Vino,

Upgraded flink to Hadoop 2.8.1

$ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep entrypoint | grep 'Hadoop version'
2018-07-25T00:19:46.142+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.8.1

but job still fails to start

Ideas?

Caused by: org.apache.flink.util.FlinkException: Failed to submit job d84cccd3bffcba1f243352a5e5ef99a9.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more


On Tue, Jul 24, 2018 at 10:32 AM vino yang <[hidden email]> wrote:
Hi Alex,

Based on your log information, the potential reason is Hadoop version. To troubleshoot the exception comes from different Hadoop version. I suggest you match the both side of Hadoop version.

You can : 

1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's official website provides the binary binding Hadoop 2.8.[1]
2. downgrade your fat jar's Hadoop client dependency's version to match Flink Cluster's hadoop dependency's version.


Thanks, vino.

2018-07-24 22:59 GMT+08:00 Alex Vinnik <[hidden email]>:
Hi Till,

Thanks for responding. Below is entrypoint logs. One thing I noticed that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could it be a reason for that error? If so how can i use same hadoop version 2.8 on flink server side?  BTW job runs fine locally reading from the same s3a buckets when executed using createLocalEnvironment via java -jar my-fat.jar --input s3a://foo --output s3a://bar

Regarding java version. The job is submitted via Flink UI, so it should not be a problem. 

Thanks a lot in advance.

2018-07-24T12:09:38.083+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current Hadoop/Kerberos user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap size: 1963 MiBytes
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME: /docker-java-home/jre
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.7.3
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dcom.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program Arguments:
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --configDir
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      /opt/flink/conf
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --executionMode
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath: /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting StandaloneSessionClusterEntrypoint.
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install default filesystem.
2018-07-24T12:09:38.927+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install security context.
2018-07-24T12:09:39.034+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing cluster services.
2018-07-24T12:09:39.059+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to start actor system at flink-jobmanager:6123
2018-07-24T12:09:40.335+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system started at akka.tcp://flink@flink-jobmanager:6123

On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

I'm not entirely sure what causes this problem because it is the first time I see it. 

First question would be if the problem also arises if using a different Hadoop version.

Are you using the same Java versions on the client as well as on the server?

Could you provide us with the cluster entrypoint logs?

Cheers,
Till

On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <[hidden email]> wrote:
Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5 and getting a weird exception.

Job reads json from s3a and writes parquet files to s3a with avro model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to S3AFileSystem class.

Fails here
with
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data

To be exact it fails right on that line.

Not sure how to resolve this problem. Looking for an advice. Let me know if more info is needed. Full stack is below. Thanks.

org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 29 more
Caused by: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more


Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.5 batch job fails to start

vino yang
Hi Alex,

Is it possible that the data has been corrupted? 

Or have you confirmed that the avro version is consistent in different Flink versions? 

Also, if you don't upgrade Flink and still use version 1.3.1, can it be recovered?


Thanks, vino.


2018-07-25 8:32 GMT+08:00 Alex Vinnik <[hidden email]>:
Vino,

Upgraded flink to Hadoop 2.8.1

$ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep entrypoint | grep 'Hadoop version'
2018-07-25T00:19:46.142+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.8.1

but job still fails to start

Ideas?

Caused by: org.apache.flink.util.FlinkException: Failed to submit job d84cccd3bffcba1f243352a5e5ef99a9.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more


On Tue, Jul 24, 2018 at 10:32 AM vino yang <[hidden email]> wrote:
Hi Alex,

Based on your log information, the potential reason is Hadoop version. To troubleshoot the exception comes from different Hadoop version. I suggest you match the both side of Hadoop version.

You can : 

1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's official website provides the binary binding Hadoop 2.8.[1]
2. downgrade your fat jar's Hadoop client dependency's version to match Flink Cluster's hadoop dependency's version.


Thanks, vino.

2018-07-24 22:59 GMT+08:00 Alex Vinnik <[hidden email]>:
Hi Till,

Thanks for responding. Below is entrypoint logs. One thing I noticed that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could it be a reason for that error? If so how can i use same hadoop version 2.8 on flink server side?  BTW job runs fine locally reading from the same s3a buckets when executed using createLocalEnvironment via java -jar my-fat.jar --input s3a://foo --output s3a://bar

Regarding java version. The job is submitted via Flink UI, so it should not be a problem. 

Thanks a lot in advance.

2018-07-24T12:09:38.083+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current Hadoop/Kerberos user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap size: 1963 MiBytes
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME: /docker-java-home/jre
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.7.3
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dcom.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program Arguments:
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --configDir
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      /opt/flink/conf
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --executionMode
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath: /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting StandaloneSessionClusterEntrypoint.
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install default filesystem.
2018-07-24T12:09:38.927+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install security context.
2018-07-24T12:09:39.034+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing cluster services.
2018-07-24T12:09:39.059+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to start actor system at flink-jobmanager:6123
2018-07-24T12:09:40.335+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system started at akka.tcp://flink@flink-jobmanager:6123

On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

I'm not entirely sure what causes this problem because it is the first time I see it. 

First question would be if the problem also arises if using a different Hadoop version.

Are you using the same Java versions on the client as well as on the server?

Could you provide us with the cluster entrypoint logs?

Cheers,
Till

On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <[hidden email]> wrote:
Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5 and getting a weird exception.

Job reads json from s3a and writes parquet files to s3a with avro model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to S3AFileSystem class.

Fails here
with
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data

To be exact it fails right on that line.

Not sure how to resolve this problem. Looking for an advice. Let me know if more info is needed. Full stack is below. Thanks.

org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 29 more
Caused by: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more



Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.5 batch job fails to start

Till Rohrmann
Hi Alex,

could you share with us the full logs of the client and the cluster entrypoint? That would be tremendously helpful.

Cheers,
Till

On Wed, Jul 25, 2018 at 4:08 AM vino yang <[hidden email]> wrote:
Hi Alex,

Is it possible that the data has been corrupted? 

Or have you confirmed that the avro version is consistent in different Flink versions? 

Also, if you don't upgrade Flink and still use version 1.3.1, can it be recovered?


Thanks, vino.


2018-07-25 8:32 GMT+08:00 Alex Vinnik <[hidden email]>:
Vino,

Upgraded flink to Hadoop 2.8.1

$ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep entrypoint | grep 'Hadoop version'
2018-07-25T00:19:46.142+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.8.1

but job still fails to start

Ideas?

Caused by: org.apache.flink.util.FlinkException: Failed to submit job d84cccd3bffcba1f243352a5e5ef99a9.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more


On Tue, Jul 24, 2018 at 10:32 AM vino yang <[hidden email]> wrote:
Hi Alex,

Based on your log information, the potential reason is Hadoop version. To troubleshoot the exception comes from different Hadoop version. I suggest you match the both side of Hadoop version.

You can : 

1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's official website provides the binary binding Hadoop 2.8.[1]
2. downgrade your fat jar's Hadoop client dependency's version to match Flink Cluster's hadoop dependency's version.


Thanks, vino.

2018-07-24 22:59 GMT+08:00 Alex Vinnik <[hidden email]>:
Hi Till,

Thanks for responding. Below is entrypoint logs. One thing I noticed that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could it be a reason for that error? If so how can i use same hadoop version 2.8 on flink server side?  BTW job runs fine locally reading from the same s3a buckets when executed using createLocalEnvironment via java -jar my-fat.jar --input s3a://foo --output s3a://bar

Regarding java version. The job is submitted via Flink UI, so it should not be a problem. 

Thanks a lot in advance.

2018-07-24T12:09:38.083+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current Hadoop/Kerberos user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap size: 1963 MiBytes
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME: /docker-java-home/jre
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.7.3
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dcom.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program Arguments:
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --configDir
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      /opt/flink/conf
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --executionMode
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath: /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting StandaloneSessionClusterEntrypoint.
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install default filesystem.
2018-07-24T12:09:38.927+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install security context.
2018-07-24T12:09:39.034+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing cluster services.
2018-07-24T12:09:39.059+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to start actor system at flink-jobmanager:6123
2018-07-24T12:09:40.335+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system started at akka.tcp://flink@flink-jobmanager:6123

On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

I'm not entirely sure what causes this problem because it is the first time I see it. 

First question would be if the problem also arises if using a different Hadoop version.

Are you using the same Java versions on the client as well as on the server?

Could you provide us with the cluster entrypoint logs?

Cheers,
Till

On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <[hidden email]> wrote:
Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5 and getting a weird exception.

Job reads json from s3a and writes parquet files to s3a with avro model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to S3AFileSystem class.

Fails here
with
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data

To be exact it fails right on that line.

Not sure how to resolve this problem. Looking for an advice. Let me know if more info is needed. Full stack is below. Thanks.

org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 29 more
Caused by: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more



Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.5 batch job fails to start

Alex Vinnik
Hi Till,

Server start up entrypoint log

2018-07-25T12:19:12.268+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-25T12:19:12.271+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting StandaloneSessionClusterEntrypoint (Version: <unknown>, Rev:3488f8b, Date:10.07.2018 @ 11:51:27 GMT)
2018-07-25T12:19:12.271+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current user: flink
2018-07-25T12:19:18.599+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current Hadoop/Kerberos user: flink
2018-07-25T12:19:18.607+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-25T12:19:18.607+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap size: 1963 MiBytes
2018-07-25T12:19:18.607+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME: /docker-java-home/jre
2018-07-25T12:19:18.615+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.8.1
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dcom.amazonaws.sdk.disableCertChecking
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program Arguments:
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --configDir
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      /opt/flink/conf
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --executionMode
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath: /opt/flink/lib/flink-metrics-datadog-1.5.1.jar:/opt/flink/lib/flink-metrics-prometheus-1.5.1.jar:/opt/flink/lib/flink-python_2.11-1.5.1.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.5.1.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/peer-group-transform-all.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.1.jar:::
2018-07-25T12:19:18.616+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-25T12:19:18.620+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-25T12:19:18.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting StandaloneSessionClusterEntrypoint.
2018-07-25T12:19:18.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install default filesystem.
2018-07-25T12:19:19.045+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install security context.
2018-07-25T12:19:19.520+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing cluster services.
2018-07-25T12:19:19.601+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to start actor system at flink-jobmanager:6123
2018-07-25T12:19:24.768+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system started at akka.tcp://flink@flink-jobmanager:6123

Below flink client log file
docker exec -it flink-jobmanager flink run /fat.jar --input-path s3a://json-input --output-path s3a://parquet-output

2018-07-25 13:40:30,752 INFO  org.apache.flink.client.cli.CliFrontend                       - --------------------------------------------------------------------------------
2018-07-25 13:40:30,756 INFO  org.apache.flink.client.cli.CliFrontend                       -  Starting Command Line Client (Version: <unknown>, Rev:3488f8b, Date:10.07.2018 @ 11:51:27 GMT)
2018-07-25 13:40:30,756 INFO  org.apache.flink.client.cli.CliFrontend                       -  OS current user: root
2018-07-25 13:40:31,797 INFO  org.apache.flink.client.cli.CliFrontend                       -  Current Hadoop/Kerberos user: root
2018-07-25 13:40:31,797 INFO  org.apache.flink.client.cli.CliFrontend                       -  JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-25 13:40:31,797 INFO  org.apache.flink.client.cli.CliFrontend                       -  Maximum heap size: 2667 MiBytes
2018-07-25 13:40:31,797 INFO  org.apache.flink.client.cli.CliFrontend                       -  JAVA_HOME: /docker-java-home/jre
2018-07-25 13:40:31,800 INFO  org.apache.flink.client.cli.CliFrontend                       -  Hadoop version: 2.8.1
2018-07-25 13:40:31,800 INFO  org.apache.flink.client.cli.CliFrontend                       -  JVM Options:
2018-07-25 13:40:31,800 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlog.file=/opt/flink/log/flink--client-d9831b2552d5.log
2018-07-25 13:40:31,800 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties
2018-07-25 13:40:31,800 INFO  org.apache.flink.client.cli.CliFrontend                       -     -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend                       -  Program Arguments:
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend                       -     run
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend                       -     /peer-group-transform-all.jar
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend                       -     --input-path
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend                       -     s3a://odin-tmp/explore-data/20180711104638/
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend                       -     --output-path
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend                       -     s3a://odin-tmp/transformed/20180712/IDA-1917-1
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend                       -  Classpath: /opt/flink/lib/flink-metrics-datadog-1.5.1.jar:/opt/flink/lib/flink-metrics-prometheus-1.5.1.jar:/opt/flink/lib/flink-python_2.11-1.5.1.jar:/opt/flink/lib/flink-s3-fs-hadoop-1.5.1.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.1.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/peer-group-transform-all.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.1.jar:::
2018-07-25 13:40:31,801 INFO  org.apache.flink.client.cli.CliFrontend                       - --------------------------------------------------------------------------------
2018-07-25 13:40:31,806 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:31,807 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:31,807 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:31,807 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:31,807 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:31,807 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2018-07-25 13:40:31,807 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-25 13:40:31,808 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:31,808 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:31,808 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:31,808 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.java.opts, -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking -Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:31,808 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2018-07-25 13:40:31,808 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:31,809 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 6124
2018-07-25 13:40:31,809 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: query.server.port, 6125
2018-07-25 13:40:32,151 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to root (auth:SIMPLE)
2018-07-25 13:40:32,190 INFO  org.apache.flink.client.cli.CliFrontend                       - Running 'run' command.
2018-07-25 13:40:32,197 INFO  org.apache.flink.client.cli.CliFrontend                       - Building program from JAR file
2018-07-25 13:40:32,363 WARN  org.apache.flink.configuration.Configuration                  - Config uses deprecated configuration key 'jobmanager.rpc.address' instead of proper key 'rest.address'
2018-07-25 13:40:32,875 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest client endpoint started.
2018-07-25 13:40:32,879 INFO  org.apache.flink.client.cli.CliFrontend                       - Starting execution of program
2018-07-25 13:40:32,879 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Starting program in interactive mode (detached: false)
2018-07-25 13:40:32,943 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:32,943 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:32,943 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:32,943 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:32,943 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:32,943 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2018-07-25 13:40:32,944 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-25 13:40:32,944 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:32,944 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:32,944 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:32,944 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.java.opts, -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking -Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:32,944 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2018-07-25 13:40:32,944 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:32,944 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 6124
2018-07-25 13:40:32,945 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: query.server.port, 6125
2018-07-25 13:40:32,950 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:32,950 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:32,950 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:32,950 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:32,950 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:32,950 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2018-07-25 13:40:32,951 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-25 13:40:32,951 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:32,951 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:32,951 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:32,951 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.java.opts, -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking -Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:32,951 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2018-07-25 13:40:32,951 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:32,952 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 6124
2018-07-25 13:40:32,952 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: query.server.port, 6125
2018-07-25 13:40:32,976 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:32,985 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:32,986 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:32,986 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:32,986 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:32,986 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2018-07-25 13:40:32,986 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-25 13:40:32,986 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:32,986 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:32,986 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:32,986 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.java.opts, -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking -Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:32,986 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2018-07-25 13:40:32,986 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:32,986 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 6124
2018-07-25 13:40:32,987 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: query.server.port, 6125
2018-07-25 13:40:33,212 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class com.fasterxml.jackson.databind.node.ObjectNode does not contain a getter for field _children
2018-07-25 13:40:33,212 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - class com.fasterxml.jackson.databind.node.ObjectNode does not contain a setter for field _children
2018-07-25 13:40:33,212 INFO  org.apache.flink.api.java.typeutils.TypeExtractor             - Class class com.fasterxml.jackson.databind.node.ObjectNode cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2018-07-25 13:40:35,676 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:35,677 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:35,677 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:35,677 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:35,677 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:35,677 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2018-07-25 13:40:35,677 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-25 13:40:35,677 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:35,678 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:35,678 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:35,678 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.java.opts, -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking -Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:35,678 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2018-07-25 13:40:35,678 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:35,678 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 6124
2018-07-25 13:40:35,678 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: query.server.port, 6125
2018-07-25 13:40:35,738 INFO  org.apache.flink.api.java.ExecutionEnvironment                - The job has 6 registered types and 0 default Kryo serializers
2018-07-25 13:40:53,672 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.address, flink-jobmanager
2018-07-25 13:40:53,672 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.rpc.port, 6123
2018-07-25 13:40:53,672 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 1024
2018-07-25 13:40:53,672 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 1024
2018-07-25 13:40:53,672 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 1
2018-07-25 13:40:53,672 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: parallelism.default, 1
2018-07-25 13:40:53,672 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: rest.port, 8081
2018-07-25 13:40:53,672 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: jobmanager.heap.mb, 2048
2018-07-25 13:40:53,672 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.heap.mb, 12228
2018-07-25 13:40:53,673 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: taskmanager.numberOfTaskSlots, 20
2018-07-25 13:40:53,673 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: env.java.opts, -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking -Dcom.amazonaws.sdk.disableCertChecking
2018-07-25 13:40:53,673 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.backend, filesystem
2018-07-25 13:40:53,673 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: state.checkpoints.dir, /tmp/checkpoints/
2018-07-25 13:40:53,673 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: blob.server.port, 6124
2018-07-25 13:40:53,673 INFO  org.apache.flink.configuration.GlobalConfiguration            - Loading configuration property: query.server.port, 6125
2018-07-25 13:40:53,765 INFO  org.apache.flink.client.program.rest.RestClusterClient        - Submitting job dc8565764f025be19b89ee98c1a398f6 (detached: false).
2018-07-25 13:40:58,885 INFO  org.apache.flink.runtime.rest.RestClient                      - Shutting down rest endpoint.
2018-07-25 13:40:58,887 INFO  org.apache.flink.runtime.rest.RestClient                      - Rest endpoint shutdown complete.
2018-07-25 13:40:58,890 ERROR org.apache.flink.client.cli.CliFrontend                       - Error while running the command.
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution result.
at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:257)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:464)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:452)
at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
at com.sailpoint.ida.data.jobs.peergrouptransform.PeerGroupTransformJob.main(PeerGroupTransformJob.java:116)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:420)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:404)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:785)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:279)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:214)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1025)
at org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:370)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:214)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 12 more
Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not complete the operation. Exception is not retryable.
... 10 more
Caused by: java.util.concurrent.CompletionException: org.apache.flink.runtime.rest.util.RestClientException: [Job submission failed.]
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:953)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
... 4 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Job submission failed.]
at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:309)
at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:293)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
... 5 more

Thanks, Alex


On Wed, Jul 25, 2018 at 2:22 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

could you share with us the full logs of the client and the cluster entrypoint? That would be tremendously helpful.

Cheers,
Till

On Wed, Jul 25, 2018 at 4:08 AM vino yang <[hidden email]> wrote:
Hi Alex,

Is it possible that the data has been corrupted? 

Or have you confirmed that the avro version is consistent in different Flink versions? 

Also, if you don't upgrade Flink and still use version 1.3.1, can it be recovered?


Thanks, vino.


2018-07-25 8:32 GMT+08:00 Alex Vinnik <[hidden email]>:
Vino,

Upgraded flink to Hadoop 2.8.1

$ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep entrypoint | grep 'Hadoop version'
2018-07-25T00:19:46.142+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.8.1

but job still fails to start

Ideas?

Caused by: org.apache.flink.util.FlinkException: Failed to submit job d84cccd3bffcba1f243352a5e5ef99a9.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more


On Tue, Jul 24, 2018 at 10:32 AM vino yang <[hidden email]> wrote:
Hi Alex,

Based on your log information, the potential reason is Hadoop version. To troubleshoot the exception comes from different Hadoop version. I suggest you match the both side of Hadoop version.

You can : 

1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's official website provides the binary binding Hadoop 2.8.[1]
2. downgrade your fat jar's Hadoop client dependency's version to match Flink Cluster's hadoop dependency's version.


Thanks, vino.

2018-07-24 22:59 GMT+08:00 Alex Vinnik <[hidden email]>:
Hi Till,

Thanks for responding. Below is entrypoint logs. One thing I noticed that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could it be a reason for that error? If so how can i use same hadoop version 2.8 on flink server side?  BTW job runs fine locally reading from the same s3a buckets when executed using createLocalEnvironment via java -jar my-fat.jar --input s3a://foo --output s3a://bar

Regarding java version. The job is submitted via Flink UI, so it should not be a problem. 

Thanks a lot in advance.

2018-07-24T12:09:38.083+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current Hadoop/Kerberos user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap size: 1963 MiBytes
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME: /docker-java-home/jre
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.7.3
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dcom.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program Arguments:
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --configDir
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      /opt/flink/conf
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --executionMode
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath: /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting StandaloneSessionClusterEntrypoint.
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install default filesystem.
2018-07-24T12:09:38.927+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install security context.
2018-07-24T12:09:39.034+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing cluster services.
2018-07-24T12:09:39.059+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to start actor system at flink-jobmanager:6123
2018-07-24T12:09:40.335+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system started at akka.tcp://flink@flink-jobmanager:6123

On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

I'm not entirely sure what causes this problem because it is the first time I see it. 

First question would be if the problem also arises if using a different Hadoop version.

Are you using the same Java versions on the client as well as on the server?

Could you provide us with the cluster entrypoint logs?

Cheers,
Till

On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <[hidden email]> wrote:
Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5 and getting a weird exception.

Job reads json from s3a and writes parquet files to s3a with avro model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to S3AFileSystem class.

Fails here
with
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data

To be exact it fails right on that line.

Not sure how to resolve this problem. Looking for an advice. Let me know if more info is needed. Full stack is below. Thanks.

org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 29 more
Caused by: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more



Reply | Threaded
Open this post in threaded view
|

Re: Flink 1.5 batch job fails to start

Alex Vinnik
In reply to this post by vino yang
Hi Vino,

Data is ok i double checked. Input is plain json and it can be processed by same code compiled and run on 1.3.1 flink. Thanks for the hint about avro and parquet versions. Got my fat jar synced up with flink 1.5.1 avro/parguet versions. Hope was high that it will help to resolve the problem. And one run of the job actually was successful., but it started failing after that with the same problem. Weird. Will continue to poke around, feels I am so close :)

Best,
-Alex

On Tue, Jul 24, 2018 at 9:08 PM vino yang <[hidden email]> wrote:
Hi Alex,

Is it possible that the data has been corrupted? 

Or have you confirmed that the avro version is consistent in different Flink versions? 

Also, if you don't upgrade Flink and still use version 1.3.1, can it be recovered?


Thanks, vino.


2018-07-25 8:32 GMT+08:00 Alex Vinnik <[hidden email]>:
Vino,

Upgraded flink to Hadoop 2.8.1

$ docker exec -it flink-jobmanager cat /var/log/flink/flink.log | grep entrypoint | grep 'Hadoop version'
2018-07-25T00:19:46.142+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.8.1

but job still fails to start

Ideas?

Caused by: org.apache.flink.util.FlinkException: Failed to submit job d84cccd3bffcba1f243352a5e5ef99a9.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@a3123a9) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more


On Tue, Jul 24, 2018 at 10:32 AM vino yang <[hidden email]> wrote:
Hi Alex,

Based on your log information, the potential reason is Hadoop version. To troubleshoot the exception comes from different Hadoop version. I suggest you match the both side of Hadoop version.

You can : 

1. Upgrade the Hadoop version which Flink Cluster depends on, Flink's official website provides the binary binding Hadoop 2.8.[1]
2. downgrade your fat jar's Hadoop client dependency's version to match Flink Cluster's hadoop dependency's version.


Thanks, vino.

2018-07-24 22:59 GMT+08:00 Alex Vinnik <[hidden email]>:
Hi Till,

Thanks for responding. Below is entrypoint logs. One thing I noticed that "Hadoop version: 2.7.3". My fat jar is built with hadoop 2.8 client. Could it be a reason for that error? If so how can i use same hadoop version 2.8 on flink server side?  BTW job runs fine locally reading from the same s3a buckets when executed using createLocalEnvironment via java -jar my-fat.jar --input s3a://foo --output s3a://bar

Regarding java version. The job is submitted via Flink UI, so it should not be a problem. 

Thanks a lot in advance.

2018-07-24T12:09:38.083+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Starting StandaloneSessionClusterEntrypoint (Version: 1.5.0, Rev:c61b108, Date:24.05.2018 @ 14:54:44 UTC)
2018-07-24T12:09:38.085+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   OS current user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Current Hadoop/Kerberos user: flink
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM: OpenJDK 64-Bit Server VM - Oracle Corporation - 1.8/25.171-b11
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Maximum heap size: 1963 MiBytes
2018-07-24T12:09:38.844+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JAVA_HOME: /docker-java-home/jre
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Hadoop version: 2.7.3
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   JVM Options:
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xms2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Xmx2048m
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dcom.amazonaws.sdk.disableCertChecking
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5015
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlog4j.configuration=file:/opt/flink/conf/log4j-console.properties
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      -Dlogback.configurationFile=file:/opt/flink/conf/logback-console.xml
2018-07-24T12:09:38.851+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Program Arguments:
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --configDir
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      /opt/flink/conf
2018-07-24T12:09:38.852+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --executionMode
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      --host
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO      cluster
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO   Classpath: /opt/flink/lib/flink-metrics-datadog-1.5.0.jar:/opt/flink/lib/flink-python_2.11-1.5.0.jar:/opt/flink/lib/flink-s3-fs-presto-1.5.0.jar:/opt/flink/lib/flink-shaded-hadoop2-uber-1.5.0.jar:/opt/flink/lib/log4j-1.2.17.jar:/opt/flink/lib/slf4j-log4j12-1.7.7.jar:/opt/flink/lib/flink-dist_2.11-1.5.0.jar:::
2018-07-24T12:09:38.853+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  --------------------------------------------------------------------------------
2018-07-24T12:09:38.854+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Registered UNIX signal handlers for [TERM, HUP, INT]
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Starting StandaloneSessionClusterEntrypoint.
2018-07-24T12:09:38.895+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install default filesystem.
2018-07-24T12:09:38.927+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Install security context.
2018-07-24T12:09:39.034+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Initializing cluster services.
2018-07-24T12:09:39.059+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Trying to start actor system at flink-jobmanager:6123
2018-07-24T12:09:40.335+0000 [org.apache.flink.runtime.entrypoint.ClusterEntrypoint] INFO  Actor system started at akka.tcp://flink@flink-jobmanager:6123

On Tue, Jul 24, 2018 at 7:16 AM Till Rohrmann <[hidden email]> wrote:
Hi Alex,

I'm not entirely sure what causes this problem because it is the first time I see it. 

First question would be if the problem also arises if using a different Hadoop version.

Are you using the same Java versions on the client as well as on the server?

Could you provide us with the cluster entrypoint logs?

Cheers,
Till

On Tue, Jul 24, 2018 at 4:56 AM Alex Vinnik <[hidden email]> wrote:
Trying to migrate existing job that runs fine on Flink 1.3.1 to Flink 1.5 and getting a weird exception.

Job reads json from s3a and writes parquet files to s3a with avro model. Job is uber jar file built with hadoop-aws-2.8.0 in order to have access to S3AFileSystem class.

Fails here
with
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data

To be exact it fails right on that line.

Not sure how to resolve this problem. Looking for an advice. Let me know if more info is needed. Full stack is below. Thanks.

org.apache.flink.runtime.rest.handler.RestHandlerException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$3(JarRunHandler.java:141)
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:811)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:534)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:20)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:18)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326)
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338)
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911)
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899)
... 29 more
Caused by: org.apache.flink.util.FlinkException: Failed to submit job 13a1478cbc7ec20f93f9ee0947856bfd.
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:247)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:162)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:169)
at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:885)
at org.apache.flink.runtime.dispatcher.Dispatcher.createJobManagerRunner(Dispatcher.java:287)
at org.apache.flink.runtime.dispatcher.Dispatcher.runJob(Dispatcher.java:277)
at org.apache.flink.runtime.dispatcher.Dispatcher.persistAndRunJob(Dispatcher.java:262)
at org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:249)
... 21 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'DataSink (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168)': Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:220)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1150)
at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1130)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:298)
at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:151)
... 26 more
Caused by: java.lang.Exception: Deserializing the OutputFormat (org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat@5ebe8168) failed: unread block data
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:63)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:216)
... 31 more
Caused by: java.lang.IllegalStateException: unread block data
at java.io.ObjectInputStream$BlockDataInputStream.setBlockDataMode(ObjectInputStream.java:2781)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1603)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:488)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:475)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:463)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:424)
at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288)
at org.apache.flink.runtime.jobgraph.OutputFormatVertex.initializeOnMaster(OutputFormatVertex.java:60)
... 32 more