flink read hdfs file error

Posted by 韩宁宁 on
URL: http://deprecated-apache-flink-user-mailing-list-archive.369.s1.nabble.com/flink-read-hdfs-file-error-tp17899.html

Dear All
       I have a question about  Flink&Hadoop.
        I want to read the files on HDFS by flink,but I encountered an error as follows,can you please advise the solution about this problem. It will be much appreciated.:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:/data/home/fbi/hanningning/flink-hdfs/target/flink-hdfs.jar!/reference.conf: 804: Could not resolve substitution to a value: ${akka.stream.materializer}
at com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
.....


====================my code===========================
public class App {

public static void main(String[] args) throws Exception {

final String inputPath = args[0]//hdfs file path;
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

HadoopInputFormat<LongWritable,Text> hadoopInputFormat =
new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(),LongWritable.class,
Text.class,new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));

DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);

text.print();

env.execute("read hdfs by flink test");
}

}
==========================maven dependencies=======================================
<properties>
<flink.version>1.4.0</flink.version>
<log4j.version>1.2.17</log4j.version>
<akka.version>2.4.20</akka.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>

Best wishes


Thanks