BucketingSink broken in flink 1.4.0 ?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

BucketingSink broken in flink 1.4.0 ?

jelmer
Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0

But i am running into the issue that the bucketing sink will always try and connect to hdfs://localhost:12345/ instead of the hfds url i have specified in the constructor

If i look at the code at

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125


It tries to create the hadoop filesystem like this

final org.apache.flink.core.fs.FileSystem flinkFs = org.apache.flink.core.fs.FileSystem.get(path.toUri());
final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;

But FileSystem.getUnguardedFileSystem will always return a 


But FileSystem.get will always return a SafetyNetWrapperFileSystem so the instanceof check will never indicate that its a hadoop filesystem


Am i missing something or is this a bug and if so what would be the correct fix ? I guess replacing FileSystem.get with FileSystem.getUnguardedFileSystem would fix it but I am afraid I lack the context to know if that would be safe
Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink broken in flink 1.4.0 ?

Chesnay Schepler
Your analysis looks correct, the code in question will never properly detect hadoop file systems. I'll open a jira.

Your suggestion to replace it with getUnguardedFileSystem() was my first instinct as well.

Good job debugging this.

On 10.01.2018 14:17, jelmer wrote:
Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0

But i am running into the issue that the bucketing sink will always try and connect to hdfs://localhost:12345/ instead of the hfds url i have specified in the constructor

If i look at the code at

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125


It tries to create the hadoop filesystem like this

final org.apache.flink.core.fs.FileSystem flinkFs = org.apache.flink.core.fs.FileSystem.get(path.toUri());
final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;

But FileSystem.getUnguardedFileSystem will always return a 


But FileSystem.get will always return a SafetyNetWrapperFileSystem so the instanceof check will never indicate that its a hadoop filesystem


Am i missing something or is this a bug and if so what would be the correct fix ? I guess replacing FileSystem.get with FileSystem.getUnguardedFileSystem would fix it but I am afraid I lack the context to know if that would be safe


Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink broken in flink 1.4.0 ?

Kyle Hamlin
I'm having similar issues after moving from 1.3..2 to 1.4.0 


I'm not actually using hdfs as my sink. I'll be using s3 as my final sink but I get the following error even when I've given a local file path to the BucketingSink.

java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more
Caused by: java.lang.ClassCastException






On Wed, Jan 10, 2018 at 1:39 PM Chesnay Schepler <[hidden email]> wrote:
Your analysis looks correct, the code in question will never properly detect hadoop file systems. I'll open a jira.

Your suggestion to replace it with getUnguardedFileSystem() was my first instinct as well.

Good job debugging this.


On 10.01.2018 14:17, jelmer wrote:
Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0

But i am running into the issue that the bucketing sink will always try and connect to hdfs://localhost:12345/ instead of the hfds url i have specified in the constructor

If i look at the code at

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125


It tries to create the hadoop filesystem like this

final org.apache.flink.core.fs.FileSystem flinkFs = org.apache.flink.core.fs.FileSystem.get(path.toUri());
final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;

But FileSystem.getUnguardedFileSystem will always return a 


But FileSystem.get will always return a SafetyNetWrapperFileSystem so the instanceof check will never indicate that its a hadoop filesystem


Am i missing something or is this a bug and if so what would be the correct fix ? I guess replacing FileSystem.get with FileSystem.getUnguardedFileSystem would fix it but I am afraid I lack the context to know if that would be safe


Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink broken in flink 1.4.0 ?

Stephan Ewen
Re-posting the solution here from other threads:

You can fix this by either

  - Removing all Hadoop dependencies from your user jar

Hope that helps.

On Wed, Jan 10, 2018 at 3:28 PM, Kyle Hamlin <[hidden email]> wrote:
I'm having similar issues after moving from 1.3..2 to 1.4.0 


I'm not actually using hdfs as my sink. I'll be using s3 as my final sink but I get the following error even when I've given a local file path to the BucketingSink.

java.lang.RuntimeException: Error while creating FileSystem when initializing the state of the BucketingSink.
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/
at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more
Caused by: java.lang.ClassCastException






On Wed, Jan 10, 2018 at 1:39 PM Chesnay Schepler <[hidden email]> wrote:
Your analysis looks correct, the code in question will never properly detect hadoop file systems. I'll open a jira.

Your suggestion to replace it with getUnguardedFileSystem() was my first instinct as well.

Good job debugging this.


On 10.01.2018 14:17, jelmer wrote:
Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0

But i am running into the issue that the bucketing sink will always try and connect to hdfs://localhost:12345/ instead of the hfds url i have specified in the constructor

If i look at the code at

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125


It tries to create the hadoop filesystem like this

final org.apache.flink.core.fs.FileSystem flinkFs = org.apache.flink.core.fs.FileSystem.get(path.toUri());
final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;

But FileSystem.getUnguardedFileSystem will always return a 


But FileSystem.get will always return a SafetyNetWrapperFileSystem so the instanceof check will never indicate that its a hadoop filesystem


Am i missing something or is this a bug and if so what would be the correct fix ? I guess replacing FileSystem.get with FileSystem.getUnguardedFileSystem would fix it but I am afraid I lack the context to know if that would be safe



Reply | Threaded
Open this post in threaded view
|

Re: BucketingSink broken in flink 1.4.0 ?

Stephan Ewen
In reply to this post by jelmer
Hi!

Thanks for diagnosing this - the fix you suggested is correct.

Can you still share some of the logs indicating where it fails? The reason is that the fallback code path (using "hdfs://localhost:12345") should not really try to connect to a local HDFS, but simply use this as a placeholder URI to get the correct configuration for Hadoop's file system.

Best,
Stephan


On Wed, Jan 10, 2018 at 2:17 PM, jelmer <[hidden email]> wrote:
Hi I am trying to convert some jobs from flink 1.3.2 to flink 1.4.0

But i am running into the issue that the bucketing sink will always try and connect to hdfs://localhost:12345/ instead of the hfds url i have specified in the constructor

If i look at the code at

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L1125


It tries to create the hadoop filesystem like this

final org.apache.flink.core.fs.FileSystem flinkFs = org.apache.flink.core.fs.FileSystem.get(path.toUri());
final FileSystem hadoopFs = (flinkFs instanceof HadoopFileSystem) ?
((HadoopFileSystem) flinkFs).getHadoopFileSystem() : null;

But FileSystem.getUnguardedFileSystem will always return a 


But FileSystem.get will always return a SafetyNetWrapperFileSystem so the instanceof check will never indicate that its a hadoop filesystem


Am i missing something or is this a bug and if so what would be the correct fix ? I guess replacing FileSystem.get with FileSystem.getUnguardedFileSystem would fix it but I am afraid I lack the context to know if that would be safe