flink read hdfs file error

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

flink read hdfs file error

韩宁宁
Dear All
       I have a question about  Flink&Hadoop.
        I want to read the files on HDFS by flink,but I encountered an error as follows,can you please advise the solution about this problem. It will be much appreciated.:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:/data/home/fbi/hanningning/flink-hdfs/target/flink-hdfs.jar!/reference.conf: 804: Could not resolve substitution to a value: ${akka.stream.materializer}
at com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
.....


====================my code===========================
public class App {

public static void main(String[] args) throws Exception {

final String inputPath = args[0]//hdfs file path;
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

HadoopInputFormat<LongWritable,Text> hadoopInputFormat =
new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(),LongWritable.class,
Text.class,new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));

DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);

text.print();

env.execute("read hdfs by flink test");
}

}
==========================maven dependencies=======================================
<properties>
<flink.version>1.4.0</flink.version>
<log4j.version>1.2.17</log4j.version>
<akka.version>2.4.20</akka.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>

Best wishes


Thanks


Reply | Threaded
Open this post in threaded view
|

Re: flink read hdfs file error

Aljoscha Krettek
Hi,

It seems you are using Akka in your user program and the reference.conf in your Jar is clashing with the same File from Flink. What is the reason for having Akka (or reference.conf) in your user jar?

Best,
Aljoscha

On 22. Jan 2018, at 11:09, 韩宁宁 <[hidden email]> wrote:

Dear All
       I have a question about  Flink&Hadoop.
        I want to read the files on HDFS by flink,but I encountered an error as follows,can you please advise the solution about this problem. It will be much appreciated.:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:/data/home/fbi/hanningning/flink-hdfs/target/flink-hdfs.jar!/reference.conf: 804: Could not resolve substitution to a value: ${akka.stream.materializer}
at com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
.....


====================my code===========================
public class App {

public static void main(String[] args) throws Exception {

final String inputPath = args[0]//hdfs file path;
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

HadoopInputFormat<LongWritable,Text> hadoopInputFormat =
new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(),LongWritable.class,
Text.class,new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));

DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);

text.print();

env.execute("read hdfs by flink test");
}

}
==========================maven dependencies=======================================
<properties>
<flink.version>1.4.0</flink.version>
<log4j.version>1.2.17</log4j.version>
<akka.version>2.4.20</akka.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
Best wishes

Thanks

Reply | Threaded
Open this post in threaded view
|

Re: flink read hdfs file error

Or Sher
Hi,
Did you ever get to solve this issue? I'm getting the same error.

On 1.3.2 I used to run the fat jar as a standalone without any job submission and it worked just fine
It looked like it used an embedded MiniCluster.

After just changing the dependencies to 1.4.0 we started getting this errors.

Does this execution type should also be supported in 1.4.0?
Couldn't find anything in the docs about it.
It always says to run the job using "flink run" which depends on an already running cluster.

On Fri, Jan 26, 2018 at 12:59 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,

It seems you are using Akka in your user program and the reference.conf in your Jar is clashing with the same File from Flink. What is the reason for having Akka (or reference.conf) in your user jar?

Best,
Aljoscha


On 22. Jan 2018, at 11:09, 韩宁宁 <[hidden email]> wrote:

Dear All
       I have a question about  Flink&Hadoop.
        I want to read the files on HDFS by flink,but I encountered an error as follows,can you please advise the solution about this problem. It will be much appreciated.:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:/data/home/fbi/hanningning/flink-hdfs/target/flink-hdfs.jar!/reference.conf: 804: Could not resolve substitution to a value: ${akka.stream.materializer}
at com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
.....


====================my code===========================
public class App {

public static void main(String[] args) throws Exception {

final String inputPath = args[0]//hdfs file path;
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

HadoopInputFormat<LongWritable,Text> hadoopInputFormat =
new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(),LongWritable.class,
Text.class,new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));

DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);

text.print();

env.execute("read hdfs by flink test");
}

}
==========================maven dependencies=======================================
<properties>
<flink.version>1.4.0</flink.version>
<log4j.version>1.2.17</log4j.version>
<akka.version>2.4.20</akka.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
Best wishes

Thanks

Reply | Threaded
Open this post in threaded view
|

Re: flink read hdfs file error

Aljoscha Krettek
Hi,

What is the exact problem you're seeing? Could you please post your POM file and also a listing of all the files in your user jar?

Best,
Aljoscha

On 15. Feb 2018, at 11:12, Or Sher <[hidden email]> wrote:

Hi,
Did you ever get to solve this issue? I'm getting the same error.

On 1.3.2 I used to run the fat jar as a standalone without any job submission and it worked just fine
It looked like it used an embedded MiniCluster.

After just changing the dependencies to 1.4.0 we started getting this errors.

Does this execution type should also be supported in 1.4.0?
Couldn't find anything in the docs about it.
It always says to run the job using "flink run" which depends on an already running cluster.

On Fri, Jan 26, 2018 at 12:59 PM Aljoscha Krettek <[hidden email]> wrote:
Hi,

It seems you are using Akka in your user program and the reference.conf in your Jar is clashing with the same File from Flink. What is the reason for having Akka (or reference.conf) in your user jar?

Best,
Aljoscha


On 22. Jan 2018, at 11:09, 韩宁宁 <[hidden email]> wrote:

Dear All
       I have a question about  Flink&Hadoop.
        I want to read the files on HDFS by flink,but I encountered an error as follows,can you please advise the solution about this problem. It will be much appreciated.:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:/data/home/fbi/hanningning/flink-hdfs/target/flink-hdfs.jar!/reference.conf: 804: Could not resolve substitution to a value: ${akka.stream.materializer}
at com.typesafe.config.impl.ConfigReference.resolveSubstitutions(ConfigReference.java:108)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
at com.typesafe.config.impl.SimpleConfigObject.modifyMayThrow(SimpleConfigObject.java:312)
at com.typesafe.config.impl.SimpleConfigObject.resolveSubstitutions(SimpleConfigObject.java:398)
at com.typesafe.config.impl.ResolveContext.realResolve(ResolveContext.java:179)
at com.typesafe.config.impl.ResolveContext.resolve(ResolveContext.java:142)
at com.typesafe.config.impl.SimpleConfigObject$ResolveModifier.modifyChildMayThrow(SimpleConfigObject.java:379)
.....


====================my code===========================
public class App {

public static void main(String[] args) throws Exception {

final String inputPath = args[0]//hdfs file path;
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

HadoopInputFormat<LongWritable,Text> hadoopInputFormat =
new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(),LongWritable.class,
Text.class,new JobConf());
TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));

DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);

text.print();

env.execute("read hdfs by flink test");
}

}
==========================maven dependencies=======================================
<properties>
<flink.version>1.4.0</flink.version>
<log4j.version>1.2.17</log4j.version>
<akka.version>2.4.20</akka.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
</dependencies>
Best wishes

Thanks