Re: flink read hdfs file error

Posted by Aljoscha Krettek on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/flink-read-hdfs-file-error-tp17899p18018.html

Hi,

It seems you are using Akka in your user program and the reference.conf in your Jar is clashing with the same File from Flink. What is the reason for having Akka (or reference.conf) in your user jar?

Best,
Aljoscha

On 22. Jan 2018, at 11:09, 韩宁宁 <[hidden email]> wrote:

Dear All
       I have a question about  Flink&Hadoop.
        I want to read the files on HDFS by flink,but I encountered an error as follows,can you please advise the solution about this problem. It will be much appreciated.:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:/data/home/fbi/hanningning/flink-hdfs/target/flink-hdfs.jar!/reference.conf: 804: Could not resolve substitution to a value: ${akka.stream.materializer}
at com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
.....


====================my code===========================
public class App {

public static void main(String[] args) throws Exception {

final String inputPath = args[0]//hdfs file path;
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

HadoopInputFormat<LongWritable,Text> hadoopInputFormat =
new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(),LongWritable.class,
Text.class,new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));

DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);

text.print();

env.execute("read hdfs by flink test");
}

}
==========================maven dependencies=======================================
<properties>
<flink.version>1.4.0</flink.version>
<log4j.version>1.2.17</log4j.version>
<akka.version>2.4.20</akka.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
Best wishes

Thanks