Building Flink from source according to vendor-specific version but causes protobuf conflict

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Building Flink from source according to vendor-specific version but causes protobuf conflict

Wei Sun
Hi guys,

Good day.

I rebuilt flink from the source and specified the vendor specific Hadoop version. It works well when i just submit a streaming application  without '-d'(--detached) option as follows:
bin/flink run -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf

But if i add the '-d'(--detached) option,  a 'org.apache.flink.client.deployment.ClusterDeploymentException' will be thrown out to the CLI. Just as:
bin/flink run -d -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf

--------------------------------Exception start--------------------------------------------------------------------------------------------------------------------------------------------
 The program finished with the following exception:
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1544777537685_0068 failed 2 times due to AM Container for appattempt_1544777537685_0068_000002 exited with  exitCode: 1
For more detailed output, check application tracking page:http://103-8-200-sh-100-F07.yidian.com:8088/proxy/application_1544777537685_0068/Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e03_1544777537685_0068_02_000001
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
at org.apache.hadoop.util.Shell.run(Shell.java:460)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1544777537685_0068
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1065)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
... 9 more
2019-01-07 17:08:55,463 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cancelling deployment from Deployment Failure Hook
2019-01-07 17:08:55,464 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Killing YARN application
2019-01-07 17:08:55,471 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deleting files in 
-------------------------------------------End----------------------End----------------------------End-------------------------------------------------

My cluster has enable the log aggregation, so I executed the following command:
yarn logs -applicationId application_1544777537685_0068, the detail about the log shows that
---------------------------------------------Exception start----------------------Exception start-----------------------------------------------------------------------------
The detail information show that:
2019-01-07 17:08:49,385 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[akka.tcp://[hidden email]:20998]
2019-01-07 17:08:49,391 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at akka.tcp://[hidden email]:20998
2019-01-07 17:08:51,108 INFO  org.apache.flink.runtime.blob.FileSystemBlobStore             - Creating highly available BLOB storage directory at hdfs://amethyst/flink/flink17/ha//application_1544777537685_0068/blob
2019-01-07 17:08:51,138 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics java.lang.ClassCastException: org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto cannot be cast to com.google.protobuf.Message
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075)
at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61)
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126)
at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
.
2019-01-07 17:08:51,142 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping Akka RPC service.
2019-01-07 17:08:51,149 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
2019-01-07 17:08:51,151 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
2019-01-07 17:08:51,170 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
2019-01-07 17:08:51,196 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka RPC service.
2019-01-07 17:08:51,196 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Could not start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint.
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:181)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
Caused by: java.lang.ClassCastException: org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto cannot be cast to com.google.protobuf.Message
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075)
at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61)
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126)
at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162)
... 2 more

---------------------------------------------------------End----------------------End----------------------------End-------------------------------------------------

When I rebuild the flink from source, I used the command:
mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.5.0

My cluster details:
Hadoop 2.6.0-cdh5.5.0
Subversion http://github.com/cloudera/hadoop -r fd21232cef7b8c1f536965897ce20f50b83ee7b2
Compiled by jenkins on 2015-11-09T20:39Z
Compiled with protoc 2.5.0
From source with checksum 98e07176d1787150a6a9c087627562c
This command was run using /opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/jars/hadoop-common-2.6.0-cdh5.5.0.jar

I am wondering that I have specified the vendor specific Hadoop version, how did this happened? Or is there anything or extra parameters should be added?

It will be predicated if anybody could help me to figure it out.


Thank you
Best Regards

Reply | Threaded
Open this post in threaded view
|

Re: Building Flink from source according to vendor-specific version but causes protobuf conflict

Timo Walther
Hi Wei,

did you play around with classloading options mentioned here [1]. The -d option might impact how classes are loaded when the job is deployed on the cluster.

I will loop in Gary that might now more about the YARN behavior.

Regards,
Timo



Am 07.01.19 um 10:33 schrieb Wei Sun:
Hi guys,

Good day.

I rebuilt flink from the source and specified the vendor specific Hadoop version. It works well when i just submit a streaming application  without '-d'(--detached) option as follows:
bin/flink run -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf

But if i add the '-d'(--detached) option,  a 'org.apache.flink.client.deployment.ClusterDeploymentException' will be thrown out to the CLI. Just as:
bin/flink run -d -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf

--------------------------------Exception start--------------------------------------------------------------------------------------------------------------------------------------------
 The program finished with the following exception:
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1544777537685_0068 failed 2 times due to AM Container for appattempt_1544777537685_0068_000002 exited with  exitCode: 1
For more detailed output, check application tracking page:http://103-8-200-sh-100-F07.yidian.com:8088/proxy/application_1544777537685_0068/Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e03_1544777537685_0068_02_000001
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
at org.apache.hadoop.util.Shell.run(Shell.java:460)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1544777537685_0068
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1065)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
... 9 more
2019-01-07 17:08:55,463 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cancelling deployment from Deployment Failure Hook
2019-01-07 17:08:55,464 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Killing YARN application
2019-01-07 17:08:55,471 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deleting files in 
-------------------------------------------End----------------------End----------------------------End-------------------------------------------------

My cluster has enable the log aggregation, so I executed the following command:
yarn logs -applicationId application_1544777537685_0068, the detail about the log shows that
---------------------------------------------Exception start----------------------Exception start-----------------------------------------------------------------------------
The detail information show that:
2019-01-07 17:08:49,385 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[[hidden email]]
2019-01-07 17:08:49,391 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at [hidden email]
2019-01-07 17:08:51,108 INFO  org.apache.flink.runtime.blob.FileSystemBlobStore             - Creating highly available BLOB storage directory at hdfs://amethyst/flink/flink17/ha//application_1544777537685_0068/blob
2019-01-07 17:08:51,138 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics java.lang.ClassCastException: org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto cannot be cast to com.google.protobuf.Message
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075)
at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61)
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126)
at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
.
2019-01-07 17:08:51,142 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping Akka RPC service.
2019-01-07 17:08:51,149 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
2019-01-07 17:08:51,151 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
2019-01-07 17:08:51,170 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
2019-01-07 17:08:51,196 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka RPC service.
2019-01-07 17:08:51,196 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Could not start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint.
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:181)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
Caused by: java.lang.ClassCastException: org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto cannot be cast to com.google.protobuf.Message
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075)
at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61)
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126)
at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162)
... 2 more

---------------------------------------------------------End----------------------End----------------------------End-------------------------------------------------

When I rebuild the flink from source, I used the command:
mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.5.0

My cluster details:
Hadoop 2.6.0-cdh5.5.0
Subversion http://github.com/cloudera/hadoop -r fd21232cef7b8c1f536965897ce20f50b83ee7b2
Compiled by jenkins on 2015-11-09T20:39Z
Compiled with protoc 2.5.0
From source with checksum 98e07176d1787150a6a9c087627562c
This command was run using /opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/jars/hadoop-common-2.6.0-cdh5.5.0.jar

I am wondering that I have specified the vendor specific Hadoop version, how did this happened? Or is there anything or extra parameters should be added?

It will be predicated if anybody could help me to figure it out.


Thank you
Best Regards


Reply | Threaded
Open this post in threaded view
|

Re: Building Flink from source according to vendor-specific versionbut causes protobuf conflict

Wei Sun
Hi,Timo

Good day!

Thank you for your help! This issue has been solved with the rebuilt flink version.  But I found that does not work with the 
'Apache Flink 1.7.1 only' version even if i configure the class path like export HADOOP_CLASSPATH=`hadoop classpath` . I will check it later. 
Thanks again.

Best Regards
Wei

------------------ Original ------------------
From:  "Timo Walther";<[hidden email]>;
Date:  Jan 8, 2019
To:  "user"<[hidden email]>;
Cc:  "gary"<[hidden email]>;
Subject:  Re: Building Flink from source according to vendor-specific versionbut causes protobuf conflict

Hi Wei,

did you play around with classloading options mentioned here [1]. The -d option might impact how classes are loaded when the job is deployed on the cluster.

I will loop in Gary that might now more about the YARN behavior.

Regards,
Timo



Am 07.01.19 um 10:33 schrieb Wei Sun:
Hi guys,

Good day.

I rebuilt flink from the source and specified the vendor specific Hadoop version. It works well when i just submit a streaming application  without '-d'(--detached) option as follows:
bin/flink run -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf

But if i add the '-d'(--detached) option,  a 'org.apache.flink.client.deployment.ClusterDeploymentException' will be thrown out to the CLI. Just as:
bin/flink run -d -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf

--------------------------------Exception start--------------------------------------------------------------------------------------------------------------------------------------------
 The program finished with the following exception:
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1544777537685_0068 failed 2 times due to AM Container for appattempt_1544777537685_0068_000002 exited with  exitCode: 1
For more detailed output, check application tracking page:http://103-8-200-sh-100-F07.yidian.com:8088/proxy/application_1544777537685_0068/Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e03_1544777537685_0068_02_000001
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
at org.apache.hadoop.util.Shell.run(Shell.java:460)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1544777537685_0068
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1065)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
... 9 more
2019-01-07 17:08:55,463 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cancelling deployment from Deployment Failure Hook
2019-01-07 17:08:55,464 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Killing YARN application
2019-01-07 17:08:55,471 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deleting files in 
-------------------------------------------End----------------------End----------------------------End-------------------------------------------------

My cluster has enable the log aggregation, so I executed the following command:
yarn logs -applicationId application_1544777537685_0068, the detail about the log shows that
---------------------------------------------Exception start----------------------Exception start-----------------------------------------------------------------------------
The detail information show that:
2019-01-07 17:08:49,385 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[[hidden email]]
2019-01-07 17:08:49,391 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at [hidden email]
2019-01-07 17:08:51,108 INFO  org.apache.flink.runtime.blob.FileSystemBlobStore             - Creating highly available BLOB storage directory at hdfs://amethyst/flink/flink17/ha//application_1544777537685_0068/blob
2019-01-07 17:08:51,138 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics java.lang.ClassCastException: org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto cannot be cast to com.google.protobuf.Message
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075)
at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61)
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126)
at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
.
2019-01-07 17:08:51,142 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping Akka RPC service.
2019-01-07 17:08:51,149 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
2019-01-07 17:08:51,151 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
2019-01-07 17:08:51,170 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
2019-01-07 17:08:51,196 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka RPC service.
2019-01-07 17:08:51,196 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Could not start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint.
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:181)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
Caused by: java.lang.ClassCastException: org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto cannot be cast to com.google.protobuf.Message
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075)
at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61)
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126)
at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162)
... 2 more

---------------------------------------------------------End----------------------End----------------------------End-------------------------------------------------

When I rebuild the flink from source, I used the command:
mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.5.0

My cluster details:
Hadoop 2.6.0-cdh5.5.0
Subversion http://github.com/cloudera/hadoop -r fd21232cef7b8c1f536965897ce20f50b83ee7b2
Compiled by jenkins on 2015-11-09T20:39Z
Compiled with protoc 2.5.0
From source with checksum 98e07176d1787150a6a9c087627562c
This command was run using /opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/jars/hadoop-common-2.6.0-cdh5.5.0.jar

I am wondering that I have specified the vendor specific Hadoop version, how did this happened? Or is there anything or extra parameters should be added?

It will be predicated if anybody could help me to figure it out.


Thank you
Best Regards


Reply | Threaded
Open this post in threaded view
|

Re: Building Flink from source according to vendor-specific versionbut causes protobuf conflict

Gary Yao-3
Hi Wei,

Did you build Flink with maven 3.2.5 as recommended in the documentation [1]?
Also, did you use the -Pvendor-repos flag to add the cloudera repository when
building?

Best,
Gary

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html#build-flink
[2] https://ci.apache.org/projects/flink/flink-docs-release-1.7/flinkDev/building.html#vendor-specific-versions

On Tue, Jan 8, 2019 at 5:17 AM Wei Sun <[hidden email]> wrote:
Hi,Timo

Good day!

Thank you for your help! This issue has been solved with the rebuilt flink version.  But I found that does not work with the 
'Apache Flink 1.7.1 only' version even if i configure the class path like export HADOOP_CLASSPATH=`hadoop classpath` . I will check it later. 
Thanks again.

Best Regards
Wei

------------------ Original ------------------
From:  "Timo Walther";<[hidden email]>;
Date:  Jan 8, 2019
To:  "user"<[hidden email]>;
Cc:  "gary"<[hidden email]>;
Subject:  Re: Building Flink from source according to vendor-specific versionbut causes protobuf conflict

Hi Wei,

did you play around with classloading options mentioned here [1]. The -d option might impact how classes are loaded when the job is deployed on the cluster.

I will loop in Gary that might now more about the YARN behavior.

Regards,
Timo



Am 07.01.19 um 10:33 schrieb Wei Sun:
Hi guys,

Good day.

I rebuilt flink from the source and specified the vendor specific Hadoop version. It works well when i just submit a streaming application  without '-d'(--detached) option as follows:
bin/flink run -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf

But if i add the '-d'(--detached) option,  a 'org.apache.flink.client.deployment.ClusterDeploymentException' will be thrown out to the CLI. Just as:
bin/flink run -d -m yarn-cluster -yqu root.streaming -yn 5 -yjm 2048 -ytm 3096 -ynm CJVFormatter -ys 2 -c yidian.data.cjv.Formatter ./cjv-formatter-1.0-SNAPSHOT.jar --conf ./formatter.conf

--------------------------------Exception start--------------------------------------------------------------------------------------------------------------------------------------------
 The program finished with the following exception:
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:82)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:238)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$16(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.yarn.AbstractYarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. 
Diagnostics from YARN: Application application_1544777537685_0068 failed 2 times due to AM Container for appattempt_1544777537685_0068_000002 exited with  exitCode: 1
For more detailed output, check application tracking page:http://103-8-200-sh-100-F07.yidian.com:8088/proxy/application_1544777537685_0068/Then, click on links to logs of each attempt.
Diagnostics: Exception from container-launch.
Container id: container_e03_1544777537685_0068_02_000001
Exit code: 1
Stack trace: ExitCodeException exitCode=1: 
at org.apache.hadoop.util.Shell.runCommand(Shell.java:543)
at org.apache.hadoop.util.Shell.run(Shell.java:460)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:720)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:210)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Container exited with a non-zero exit code 1
Failing this attempt. Failing the application.
If log aggregation is enabled on your cluster, use this command to further investigate the issue:
yarn logs -applicationId application_1544777537685_0068
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.startAppMaster(AbstractYarnClusterDescriptor.java:1065)
at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deployInternal(AbstractYarnClusterDescriptor.java:545)
at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:75)
... 9 more
2019-01-07 17:08:55,463 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Cancelling deployment from Deployment Failure Hook
2019-01-07 17:08:55,464 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Killing YARN application
2019-01-07 17:08:55,471 INFO  org.apache.flink.yarn.AbstractYarnClusterDescriptor           - Deleting files in 
-------------------------------------------End----------------------End----------------------------End-------------------------------------------------

My cluster has enable the log aggregation, so I executed the following command:
yarn logs -applicationId application_1544777537685_0068, the detail about the log shows that
---------------------------------------------Exception start----------------------Exception start-----------------------------------------------------------------------------
The detail information show that:
2019-01-07 17:08:49,385 INFO  akka.remote.Remoting                                          - Remoting started; listening on addresses :[[hidden email]]
2019-01-07 17:08:49,391 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils         - Actor system started at [hidden email]
2019-01-07 17:08:51,108 INFO  org.apache.flink.runtime.blob.FileSystemBlobStore             - Creating highly available BLOB storage directory at hdfs://amethyst/flink/flink17/ha//application_1544777537685_0068/blob
2019-01-07 17:08:51,138 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shutting YarnJobClusterEntrypoint down with application status FAILED. Diagnostics java.lang.ClassCastException: org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto cannot be cast to com.google.protobuf.Message
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075)
at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61)
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126)
at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
.
2019-01-07 17:08:51,142 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping Akka RPC service.
2019-01-07 17:08:51,149 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
2019-01-07 17:08:51,151 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
2019-01-07 17:08:51,170 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
2019-01-07 17:08:51,196 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka RPC service.
2019-01-07 17:08:51,196 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Could not start cluster entrypoint YarnJobClusterEntrypoint.
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint.
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:181)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:517)
at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:102)
Caused by: java.lang.ClassCastException: org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$MkdirsRequestProto cannot be cast to com.google.protobuf.Message
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:3075)
at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:3042)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:956)
at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:952)
at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:952)
at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:945)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170)
at org.apache.flink.runtime.blob.FileSystemBlobStore.<init>(FileSystemBlobStore.java:61)
at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:126)
at org.apache.flink.runtime.blob.BlobUtils.createBlobStoreFromConfig(BlobUtils.java:92)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createHighAvailabilityServices(HighAvailabilityServicesUtils.java:121)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.createHaServices(ClusterEntrypoint.java:304)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:261)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:207)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$58(ClusterEntrypoint.java:163)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:162)
... 2 more

---------------------------------------------------------End----------------------End----------------------------End-------------------------------------------------

When I rebuild the flink from source, I used the command:
mvn clean install -DskipTests -Dhadoop.version=2.6.0-cdh5.5.0

My cluster details:
Hadoop 2.6.0-cdh5.5.0
Subversion http://github.com/cloudera/hadoop -r fd21232cef7b8c1f536965897ce20f50b83ee7b2
Compiled by jenkins on 2015-11-09T20:39Z
Compiled with protoc 2.5.0
From source with checksum 98e07176d1787150a6a9c087627562c
This command was run using /opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/jars/hadoop-common-2.6.0-cdh5.5.0.jar

I am wondering that I have specified the vendor specific Hadoop version, how did this happened? Or is there anything or extra parameters should be added?

It will be predicated if anybody could help me to figure it out.


Thank you
Best Regards