BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Kyle Hamlin
Hello,

After moving to Flink 1.4.0 I'm getting the following error. I can't find anything online that addresses it. Is it a Hadoop dependency issue? Here are my project dependencies: 

libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
"org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
"org.apache.flink" % "flink-metrics-core" % flinkVersion,
"org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
"org.apache.kafka" %% "kafka" % "0.10.0.1",
"org.apache.avro" % "avro" % "1.7.7",
"org.apache.parquet" % "parquet-hadoop" % "1.8.1",
"org.apache.parquet" % "parquet-avro" % "1.8.1",
"io.confluent" % "kafka-avro-serializer" % "3.2.0",
"org.apache.hadoop" % "hadoop-common" % "3.0.0"
)
Stacktrace:
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 14:20:52 Job execution switched to status RUNNING.
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to org.apache.hadoop.ipc.RpcEngine
at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
... 13 more
Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Stephan Ewen
Hi!

This looks indeed like a class-loading issue - it looks like "RpcEngine" and "ProtobufRpcEngine" are loaded via different classloaders.

Can you try the following:

  - In your flink-conf.yml, set classloader.resolve-orderparent-first

If that fixes the issue, then we can look at a way to make this seamless...

On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <[hidden email]> wrote:
Hello,

After moving to Flink 1.4.0 I'm getting the following error. I can't find anything online that addresses it. Is it a Hadoop dependency issue? Here are my project dependencies: 

libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
"org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
"org.apache.flink" % "flink-metrics-core" % flinkVersion,
"org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
"org.apache.kafka" %% "kafka" % "0.10.0.1",
"org.apache.avro" % "avro" % "1.7.7",
"org.apache.parquet" % "parquet-hadoop" % "1.8.1",
"org.apache.parquet" % "parquet-avro" % "1.8.1",
"io.confluent" % "kafka-avro-serializer" % "3.2.0",
"org.apache.hadoop" % "hadoop-common" % "3.0.0"
)
Stacktrace:
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 14:20:52 Job execution switched to status RUNNING.
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to org.apache.hadoop.ipc.RpcEngine
at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
... 13 more

Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Stephan Ewen
@Kyle:

Please also check if you have any Hadoop classes in your user jar. There should be none, Hadoop should only be in the Flink classpath.
Fixing the project Maven setup (making sure Hadoop and Flink core dependencies are provided) should work.

To do that, you can for example use the latest quickstart template from Flink 1.4

On Thu, Jan 4, 2018 at 11:40 AM, Stephan Ewen <[hidden email]> wrote:
Hi!

This looks indeed like a class-loading issue - it looks like "RpcEngine" and "ProtobufRpcEngine" are loaded via different classloaders.

Can you try the following:

  - In your flink-conf.yml, set classloader.resolve-order: parent-first

If that fixes the issue, then we can look at a way to make this seamless...

On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <[hidden email]> wrote:
Hello,

After moving to Flink 1.4.0 I'm getting the following error. I can't find anything online that addresses it. Is it a Hadoop dependency issue? Here are my project dependencies: 

libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
"org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
"org.apache.flink" % "flink-metrics-core" % flinkVersion,
"org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
"org.apache.kafka" %% "kafka" % "0.10.0.1",
"org.apache.avro" % "avro" % "1.7.7",
"org.apache.parquet" % "parquet-hadoop" % "1.8.1",
"org.apache.parquet" % "parquet-avro" % "1.8.1",
"io.confluent" % "kafka-avro-serializer" % "3.2.0",
"org.apache.hadoop" % "hadoop-common" % "3.0.0"
)
Stacktrace:
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 14:20:52 Job execution switched to status RUNNING.
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to org.apache.hadoop.ipc.RpcEngine
at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
... 13 more


Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Aljoscha Krettek
In reply to this post by Stephan Ewen
I think this might be happening because partial Hadoop dependencies are in the user jar and the rest is only available from the Hadoop deps that come bundled with Flink. For example, I noticed that you have Hadoop-common as a dependency which probably ends up in your Jar.

On 4. Jan 2018, at 11:40, Stephan Ewen <[hidden email]> wrote:

Hi!

This looks indeed like a class-loading issue - it looks like "RpcEngine" and "ProtobufRpcEngine" are loaded via different classloaders.

Can you try the following:

  - In your flink-conf.yml, set classloader.resolve-orderparent-first

If that fixes the issue, then we can look at a way to make this seamless...

On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <[hidden email]> wrote:
Hello,

After moving to Flink 1.4.0 I'm getting the following error. I can't find anything online that addresses it. Is it a Hadoop dependency issue? Here are my project dependencies: 

libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
"org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
"org.apache.flink" % "flink-metrics-core" % flinkVersion,
"org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
"org.apache.kafka" %% "kafka" % "0.10.0.1",
"org.apache.avro" % "avro" % "1.7.7",
"org.apache.parquet" % "parquet-hadoop" % "1.8.1",
"org.apache.parquet" % "parquet-avro" % "1.8.1",
"io.confluent" % "kafka-avro-serializer" % "3.2.0",
"org.apache.hadoop" % "hadoop-common" % "3.0.0"
)
Stacktrace:
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for job completion.
Connected to JobManager at Actor[<a href="akka.tcp://flink@localho" class="">akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 14:20:52 Job execution switched to status RUNNING.
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI: <a href="hdfs://localhost:12345/" class="">hdfs://localhost:12345/
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to org.apache.hadoop.ipc.RpcEngine
at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
... 13 more


Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Kyle Hamlin
I have the hadoop-common.jar in my build.sbt because I was having issues compiling my jar after moving from 1.3.2 to 1.4.0 because org.apache.hadoop.fs.{FileSystem, Path} were no longer in Flink and I use them in my custom bucketer and to writer to write Avro out to Parquet.

I tried adding classloader.resolve-orderparent-first to my flink-conf.yaml but that didn't seem to work. I greped my jar for "hadoop" and found the following:

org/apache/hadoop/*
org/apache/parquet/hadoop/*

after designating hadoop-common.jar dependency as "provided" only org/apache/parquet/hadoop/* files show up. Additionally, the "RpcEngine" and "ProtobufRpcEngine" error doesn't show up anymore just the following:

java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more 

Moving the hadoop-common.jar to flinks lib/ directory also doesn't appear to help. 

On Thu, Jan 4, 2018 at 10:48 AM Aljoscha Krettek <[hidden email]> wrote:
I think this might be happening because partial Hadoop dependencies are in the user jar and the rest is only available from the Hadoop deps that come bundled with Flink. For example, I noticed that you have Hadoop-common as a dependency which probably ends up in your Jar.


On 4. Jan 2018, at 11:40, Stephan Ewen <[hidden email]> wrote:

Hi!

This looks indeed like a class-loading issue - it looks like "RpcEngine" and "ProtobufRpcEngine" are loaded via different classloaders.

Can you try the following:

  - In your flink-conf.yml, set classloader.resolve-orderparent-first

If that fixes the issue, then we can look at a way to make this seamless...

On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <[hidden email]> wrote:
Hello,

After moving to Flink 1.4.0 I'm getting the following error. I can't find anything online that addresses it. Is it a Hadoop dependency issue? Here are my project dependencies: 

libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
"org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
"org.apache.flink" % "flink-metrics-core" % flinkVersion,
"org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
"org.apache.kafka" %% "kafka" % "0.10.0.1",
"org.apache.avro" % "avro" % "1.7.7",
"org.apache.parquet" % "parquet-hadoop" % "1.8.1",
"org.apache.parquet" % "parquet-avro" % "1.8.1",
"io.confluent" % "kafka-avro-serializer" % "3.2.0",
"org.apache.hadoop" % "hadoop-common" % "3.0.0"
)
Stacktrace:
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 14:20:52 Job execution switched to status RUNNING.
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to org.apache.hadoop.ipc.RpcEngine
at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
... 13 more


Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Kyle Hamlin
Also, I'm not using hdfs I'm trying to sink to s3.

On Fri, Jan 5, 2018 at 6:18 PM Kyle Hamlin <[hidden email]> wrote:
I have the hadoop-common.jar in my build.sbt because I was having issues compiling my jar after moving from 1.3.2 to 1.4.0 because org.apache.hadoop.fs.{FileSystem, Path} were no longer in Flink and I use them in my custom bucketer and to writer to write Avro out to Parquet.

I tried adding classloader.resolve-orderparent-first to my flink-conf.yaml but that didn't seem to work. I greped my jar for "hadoop" and found the following:

org/apache/hadoop/*
org/apache/parquet/hadoop/*

after designating hadoop-common.jar dependency as "provided" only org/apache/parquet/hadoop/* files show up. Additionally, the "RpcEngine" and "ProtobufRpcEngine" error doesn't show up anymore just the following:

java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more 

Moving the hadoop-common.jar to flinks lib/ directory also doesn't appear to help. 


On Thu, Jan 4, 2018 at 10:48 AM Aljoscha Krettek <[hidden email]> wrote:
I think this might be happening because partial Hadoop dependencies are in the user jar and the rest is only available from the Hadoop deps that come bundled with Flink. For example, I noticed that you have Hadoop-common as a dependency which probably ends up in your Jar.


On 4. Jan 2018, at 11:40, Stephan Ewen <[hidden email]> wrote:

Hi!

This looks indeed like a class-loading issue - it looks like "RpcEngine" and "ProtobufRpcEngine" are loaded via different classloaders.

Can you try the following:

  - In your flink-conf.yml, set classloader.resolve-orderparent-first

If that fixes the issue, then we can look at a way to make this seamless...

On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <[hidden email]> wrote:
Hello,

After moving to Flink 1.4.0 I'm getting the following error. I can't find anything online that addresses it. Is it a Hadoop dependency issue? Here are my project dependencies: 

libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
"org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
"org.apache.flink" % "flink-metrics-core" % flinkVersion,
"org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
"org.apache.kafka" %% "kafka" % "0.10.0.1",
"org.apache.avro" % "avro" % "1.7.7",
"org.apache.parquet" % "parquet-hadoop" % "1.8.1",
"org.apache.parquet" % "parquet-avro" % "1.8.1",
"io.confluent" % "kafka-avro-serializer" % "3.2.0",
"org.apache.hadoop" % "hadoop-common" % "3.0.0"
)
Stacktrace:
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 14:20:52 Job execution switched to status RUNNING.
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to org.apache.hadoop.ipc.RpcEngine
at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
... 13 more


Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0

Kyle Hamlin
[hidden email] I setup my project using the template you suggested and I'm able to bucket and write files locally. I also want to test writing to s3 but I don't know how to configure the `sbt run` command to tell the FlinkMiniCluster to use the flink-s3-fs-hadoop-1.4.0.jar and a flink-conf.yaml? 

When I try running my jar via the `flink run` command I get the same: "java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink" error. How do I overcome this issues while being able to use the `flink run` command, so I'm able to the flink-conf.yaml and flink-s3-fs-hadoop-1.4.0.jar? 


On Fri, Jan 5, 2018 at 7:50 PM Kyle Hamlin <[hidden email]> wrote:
Also, I'm not using hdfs I'm trying to sink to s3.

On Fri, Jan 5, 2018 at 6:18 PM Kyle Hamlin <[hidden email]> wrote:
I have the hadoop-common.jar in my build.sbt because I was having issues compiling my jar after moving from 1.3.2 to 1.4.0 because org.apache.hadoop.fs.{FileSystem, Path} were no longer in Flink and I use them in my custom bucketer and to writer to write Avro out to Parquet.

I tried adding classloader.resolve-orderparent-first to my flink-conf.yaml but that didn't seem to work. I greped my jar for "hadoop" and found the following:

org/apache/hadoop/*
org/apache/parquet/hadoop/*

after designating hadoop-common.jar dependency as "provided" only org/apache/parquet/hadoop/* files show up. Additionally, the "RpcEngine" and "ProtobufRpcEngine" error doesn't show up anymore just the following:

java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more 

Moving the hadoop-common.jar to flinks lib/ directory also doesn't appear to help. 


On Thu, Jan 4, 2018 at 10:48 AM Aljoscha Krettek <[hidden email]> wrote:
I think this might be happening because partial Hadoop dependencies are in the user jar and the rest is only available from the Hadoop deps that come bundled with Flink. For example, I noticed that you have Hadoop-common as a dependency which probably ends up in your Jar.


On 4. Jan 2018, at 11:40, Stephan Ewen <[hidden email]> wrote:

Hi!

This looks indeed like a class-loading issue - it looks like "RpcEngine" and "ProtobufRpcEngine" are loaded via different classloaders.

Can you try the following:

  - In your flink-conf.yml, set classloader.resolve-orderparent-first

If that fixes the issue, then we can look at a way to make this seamless...

On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <[hidden email]> wrote:
Hello,

After moving to Flink 1.4.0 I'm getting the following error. I can't find anything online that addresses it. Is it a Hadoop dependency issue? Here are my project dependencies: 

libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
"org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
"org.apache.flink" % "flink-metrics-core" % flinkVersion,
"org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
"org.apache.kafka" %% "kafka" % "0.10.0.1",
"org.apache.avro" % "avro" % "1.7.7",
"org.apache.parquet" % "parquet-hadoop" % "1.8.1",
"org.apache.parquet" % "parquet-avro" % "1.8.1",
"io.confluent" % "kafka-avro-serializer" % "3.2.0",
"org.apache.hadoop" % "hadoop-common" % "3.0.0"
)
Stacktrace:
Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
Using address localhost:6123 to connect to JobManager.
JobManager web interface address http://localhost:8082
Starting execution of program
Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259] with leader session id 00000000-0000-0000-0000-000000000000.
01/03/2018 14:20:52 Job execution switched to status RUNNING.
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more
Caused by: java.lang.ClassCastException: org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to org.apache.hadoop.ipc.RpcEngine
at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
... 13 more