I am trying to get some file from HDFS which is locally installed. But I am not able to. I tried with both these ways. But all the time the program is ending with "Process finished with exit code 239." Any help will be helpful-
public class Processor {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
Job job = Job.getInstance();
HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);
TextInputFormat.addInputPath(job, new Path("hdfs://localhost:9000/newfolder/testdata1"));
DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
words.print();
env.execute("Processor");
}
public static final class Tokenizer extends RichFlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
@Override
public void flatMap(Tuple2<LongWritable, Text> value, org.apache.flink.util.Collector<Tuple2<String, Integer>> out) throws Exception {
// normalize and split the line
String line = value.f1.toString();
String[] tokens = line.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//env.setParallelism(1);
DataSet<Tuple2<LongWritable, Text>> input;
input = env.createInput(HadoopInputs.readHadoopFile(new TextInputFormat(),
LongWritable.class, Text.class, "hdfs://localhost:9000/newfolder/testdata1"));
DataSet<String> stringInput = input.map(new MapFunction<Tuple2<LongWritable, Text>, String>() {
@Override
public String map(Tuple2<LongWritable, Text> longWritableTextTuple2) throws Exception {
return longWritableTextTuple2.f1.toString();
}
});
stringInput.print();
env.execute("Processor");
}