Help with flink hdfs sink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Help with flink hdfs sink

Nick Bendtner
Hi guys, 
I am using flink version 1.7.2. 
I am trying to write to hdfs sink from my flink job. I setup HADOOP_HOME. Here is the debug log for this : 

2020-03-19 18:59:34,316 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Cannot find hdfs-default configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Cannot find hdfs-site configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Adding /home/was/HDFSConf/conf/core-site.xml to hadoop configuration
2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Adding /home/was/HDFSConf/conf/hdfs-site.xml to hadoop configuration
2020-03-19 18:59:34,344 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to kafka (auth:KERBEROS)

This is what my streaming file sink code looks like.

val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path("hdfs://tmp/auditlog/"), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.create()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024
* 1024 * 1024)
.build())
.build()

result.addSink(sink).name("HDFSSink")

When I run the job I get this error stack trace : 
 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: HDFSSink (1/1) (27b62d6294da47491041d750daf421a0) switched from RUNNING to FAILED.
java.io.IOException: Cannot instantiate file system for URI: hdfs://tmp/auditlog
        at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
        at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: tmp
        at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
        at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:687)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:628)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)

Why is it trying to connect to /tmp ? Is it not supposed to get the namenodes from the core-site.xml and hdfs-site.xml ?
Can you please help with the correct way to configure hdfs sink. 

Best,
Nick. 


Reply | Threaded
Open this post in threaded view
|

Re: Help with flink hdfs sink

Jingsong Li
Hi Nick,

You can try "new Path("hdfs:///tmp/auditlog/")". There is one additional / after hdfs://, which is a protocol name.

Best,
Jingsong Lee

On Fri, Mar 20, 2020 at 3:13 AM Nick Bendtner <[hidden email]> wrote:
Hi guys, 
I am using flink version 1.7.2. 
I am trying to write to hdfs sink from my flink job. I setup HADOOP_HOME. Here is the debug log for this : 

2020-03-19 18:59:34,316 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Cannot find hdfs-default configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Cannot find hdfs-site configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Adding /home/was/HDFSConf/conf/core-site.xml to hadoop configuration
2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Adding /home/was/HDFSConf/conf/hdfs-site.xml to hadoop configuration
2020-03-19 18:59:34,344 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to kafka (auth:KERBEROS)

This is what my streaming file sink code looks like.

val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path("hdfs://tmp/auditlog/"), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.create()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024
* 1024 * 1024)
.build())
.build()

result.addSink(sink).name("HDFSSink")

When I run the job I get this error stack trace : 
 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: HDFSSink (1/1) (27b62d6294da47491041d750daf421a0) switched from RUNNING to FAILED.
java.io.IOException: Cannot instantiate file system for URI: hdfs://tmp/auditlog
        at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
        at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: tmp
        at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
        at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:687)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:628)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)

Why is it trying to connect to /tmp ? Is it not supposed to get the namenodes from the core-site.xml and hdfs-site.xml ?
Can you please help with the correct way to configure hdfs sink. 

Best,
Nick. 




--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Help with flink hdfs sink

Yang Wang
I think Jingsong is right. You miss a slash in your HDFS path.

Usually a HDFS path is like this "hdfs://nameservice/path/of/your/file".
And the nameservice could be omitted if you want to use the defaultFS
configured in the core-site.xml.


Best,
Yang

Jingsong Li <[hidden email]> 于2020年3月20日周五 上午10:09写道:
Hi Nick,

You can try "new Path("hdfs:///tmp/auditlog/")". There is one additional / after hdfs://, which is a protocol name.

Best,
Jingsong Lee

On Fri, Mar 20, 2020 at 3:13 AM Nick Bendtner <[hidden email]> wrote:
Hi guys, 
I am using flink version 1.7.2. 
I am trying to write to hdfs sink from my flink job. I setup HADOOP_HOME. Here is the debug log for this : 

2020-03-19 18:59:34,316 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Cannot find hdfs-default configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Cannot find hdfs-site configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Adding /home/was/HDFSConf/conf/core-site.xml to hadoop configuration
2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Adding /home/was/HDFSConf/conf/hdfs-site.xml to hadoop configuration
2020-03-19 18:59:34,344 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to kafka (auth:KERBEROS)

This is what my streaming file sink code looks like.

val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path("hdfs://tmp/auditlog/"), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.create()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024
* 1024 * 1024)
.build())
.build()

result.addSink(sink).name("HDFSSink")

When I run the job I get this error stack trace : 
 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: HDFSSink (1/1) (27b62d6294da47491041d750daf421a0) switched from RUNNING to FAILED.
java.io.IOException: Cannot instantiate file system for URI: hdfs://tmp/auditlog
        at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
        at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: tmp
        at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
        at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:687)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:628)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)

Why is it trying to connect to /tmp ? Is it not supposed to get the namenodes from the core-site.xml and hdfs-site.xml ?
Can you please help with the correct way to configure hdfs sink. 

Best,
Nick. 




--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: Help with flink hdfs sink

Nick Bendtner
Thank you so much guys, I used "hdfs://nameservice/path/of/your/file", works fine for me now. 

Best,
Nick

On Fri, Mar 20, 2020 at 3:48 AM Yang Wang <[hidden email]> wrote:
I think Jingsong is right. You miss a slash in your HDFS path.

Usually a HDFS path is like this "hdfs://nameservice/path/of/your/file".
And the nameservice could be omitted if you want to use the defaultFS
configured in the core-site.xml.


Best,
Yang

Jingsong Li <[hidden email]> 于2020年3月20日周五 上午10:09写道:
Hi Nick,

You can try "new Path("hdfs:///tmp/auditlog/")". There is one additional / after hdfs://, which is a protocol name.

Best,
Jingsong Lee

On Fri, Mar 20, 2020 at 3:13 AM Nick Bendtner <[hidden email]> wrote:
Hi guys, 
I am using flink version 1.7.2. 
I am trying to write to hdfs sink from my flink job. I setup HADOOP_HOME. Here is the debug log for this : 

2020-03-19 18:59:34,316 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Cannot find hdfs-default configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Cannot find hdfs-site configuration-file path in Flink config.
2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Adding /home/was/HDFSConf/conf/core-site.xml to hadoop configuration
2020-03-19 18:59:34,317 DEBUG org.apache.flink.runtime.util.HadoopUtils                     - Adding /home/was/HDFSConf/conf/hdfs-site.xml to hadoop configuration
2020-03-19 18:59:34,344 INFO  org.apache.flink.runtime.security.modules.HadoopModule        - Hadoop user set to kafka (auth:KERBEROS)

This is what my streaming file sink code looks like.

val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path("hdfs://tmp/auditlog/"), new SimpleStringEncoder[String]("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.create()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024
* 1024 * 1024)
.build())
.build()

result.addSink(sink).name("HDFSSink")

When I run the job I get this error stack trace : 
 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Sink: HDFSSink (1/1) (27b62d6294da47491041d750daf421a0) switched from RUNNING to FAILED.
java.io.IOException: Cannot instantiate file system for URI: hdfs://tmp/auditlog
        at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
        at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:399)
        at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:318)
        at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$RowFormatBuilder.createBuckets(StreamingFileSink.java:242)
        at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
        at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: tmp
        at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
        at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
        at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:687)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:628)
        at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)

Why is it trying to connect to /tmp ? Is it not supposed to get the namenodes from the core-site.xml and hdfs-site.xml ?
Can you please help with the correct way to configure hdfs sink. 

Best,
Nick. 




--
Best, Jingsong Lee